You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by go...@apache.org on 2017/05/18 00:16:40 UTC
incubator-tephra git commit: TEPHRA-152 Using ReferenceCounting for
TransactionStateCache refresh thread, so that it can be stopped
Repository: incubator-tephra
Updated Branches:
refs/heads/master 3af3da869 -> 9b63985fc
TEPHRA-152 Using ReferenceCounting for TransactionStateCache refresh thread, so that it can be stopped
This closes #41 from GitHub.
Signed-off-by: Gokul Gunasekaran <go...@cask.co>
Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/9b63985f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/9b63985f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/9b63985f
Branch: refs/heads/master
Commit: 9b63985fcfbe25469270c77d2bfb17e21e27ce5c
Parents: 3af3da8
Author: Gokul Gunasekaran <go...@cask.co>
Authored: Tue Mar 21 15:37:48 2017 -0700
Committer: Gokul Gunasekaran <go...@cask.co>
Committed: Wed May 17 17:16:32 2017 -0700
----------------------------------------------------------------------
.../tephra/coprocessor/CacheSupplier.java | 43 +++++++++
.../coprocessor/ReferenceCountedSupplier.java | 94 ++++++++++++++++++++
.../TransactionStateCacheSupplier.java | 43 +++++----
.../hbase/coprocessor/TransactionProcessor.java | 15 +++-
.../hbase/txprune/PruneUpperBoundWriter.java | 4 +
.../txprune/PruneUpperBoundWriterSupplier.java | 66 ++++----------
.../hbase/txprune/InvalidListPruneTest.java | 11 ++-
.../hbase/coprocessor/TransactionProcessor.java | 12 ++-
.../hbase/txprune/PruneUpperBoundWriter.java | 4 +
.../txprune/PruneUpperBoundWriterSupplier.java | 66 ++++----------
.../hbase/txprune/InvalidListPruneTest.java | 11 ++-
.../hbase/coprocessor/TransactionProcessor.java | 15 +++-
.../hbase/txprune/PruneUpperBoundWriter.java | 4 +
.../txprune/PruneUpperBoundWriterSupplier.java | 66 ++++----------
.../hbase/txprune/InvalidListPruneTest.java | 11 ++-
.../hbase/coprocessor/TransactionProcessor.java | 15 +++-
.../hbase/txprune/PruneUpperBoundWriter.java | 4 +
.../txprune/PruneUpperBoundWriterSupplier.java | 66 ++++----------
.../hbase/txprune/InvalidListPruneTest.java | 11 ++-
.../hbase/coprocessor/TransactionProcessor.java | 15 +++-
.../hbase/txprune/PruneUpperBoundWriter.java | 4 +
.../txprune/PruneUpperBoundWriterSupplier.java | 66 ++++----------
.../hbase/txprune/InvalidListPruneTest.java | 11 ++-
23 files changed, 357 insertions(+), 300 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-core/src/main/java/org/apache/tephra/coprocessor/CacheSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/coprocessor/CacheSupplier.java b/tephra-core/src/main/java/org/apache/tephra/coprocessor/CacheSupplier.java
new file mode 100644
index 0000000..db93965
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/coprocessor/CacheSupplier.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.coprocessor;
+
+import com.google.common.base.Supplier;
+import com.google.common.util.concurrent.Service;
+
+/**
+ * Provides ability to get and release objects
+ *
+ * @param <T> type of the object supplied
+ */
+public interface CacheSupplier<T extends Service> extends Supplier<T> {
+
+ /**
+ * @return Get an instance of T and if it is the first call, then the service will be started. Subsequent calls
+ * will get a reference to the same instance
+ */
+ @Override
+ T get();
+
+ /**
+ * Release the object obtained through {code Supplier#get()}. If this is the last release call, then the service will
+ * be stopped.
+ */
+ void release();
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-core/src/main/java/org/apache/tephra/coprocessor/ReferenceCountedSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/coprocessor/ReferenceCountedSupplier.java b/tephra-core/src/main/java/org/apache/tephra/coprocessor/ReferenceCountedSupplier.java
new file mode 100644
index 0000000..a0fa7ad
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/coprocessor/ReferenceCountedSupplier.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.coprocessor;
+
+import com.google.common.base.Supplier;
+import com.google.common.util.concurrent.Service;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Reference counts the {@link Service} and manages the lifecycle of the {@link Service} instance.
+ *
+ * @param <T> type of {@link Service} that is reference counted
+ */
+public class ReferenceCountedSupplier<T extends Service> {
+ private static final Log LOG = LogFactory.getLog(ReferenceCountedSupplier.class);
+
+ private final AtomicReference<T> instance = new AtomicReference<>(null);
+ private final AtomicInteger refCount = new AtomicInteger(0);
+ private final Object lock = new Object();
+
+ private final String instanceName;
+
+ public ReferenceCountedSupplier(String instanceName) {
+ this.instanceName = instanceName;
+ }
+
+ public T getOrCreate(Supplier<T> instanceSupplier) {
+ synchronized (lock) {
+ if (instance.get() == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Creating and starting Service %s.", instanceName));
+ }
+
+ // Instance has not been instantiated
+ T serviceInstance = instanceSupplier.get();
+ instance.set(serviceInstance);
+ serviceInstance.start();
+ }
+ int newCount = refCount.incrementAndGet();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Incrementing reference count for Service %s: %d", instanceName, newCount));
+ }
+ return instance.get();
+ }
+ }
+
+ public void release() {
+ synchronized (lock) {
+ if (refCount.get() <= 0) {
+ LOG.warn(String.format("Reference Count for Service %s is already zero.", instanceName));
+ return;
+ }
+
+ int newCount = refCount.decrementAndGet();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Decrementing reference count for Service %s: %d", instanceName, newCount));
+ }
+
+ if (newCount == 0) {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Reference Count for Service is 0. Stopping Service %s.", instanceName));
+ }
+
+ Service serviceInstance = instance.get();
+ serviceInstance.stopAndWait();
+ instance.set(null);
+ } catch (Exception ex) {
+ LOG.warn(String.format("Exception while trying to stop Service %s.", instanceName), ex);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCacheSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCacheSupplier.java b/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCacheSupplier.java
index d19da36..db0ca50 100644
--- a/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCacheSupplier.java
+++ b/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCacheSupplier.java
@@ -24,31 +24,40 @@ import org.apache.hadoop.conf.Configuration;
/**
* Supplies instances of {@link TransactionStateCache} implementations.
*/
-public class TransactionStateCacheSupplier implements Supplier<TransactionStateCache> {
- protected static volatile TransactionStateCache instance;
- protected static Object lock = new Object();
+public class TransactionStateCacheSupplier implements CacheSupplier<TransactionStateCache> {
- protected final Configuration conf;
+ private static final ReferenceCountedSupplier<TransactionStateCache> referenceCountedSupplier =
+ new ReferenceCountedSupplier<>(TransactionStateCache.class.getSimpleName());
- public TransactionStateCacheSupplier(Configuration conf) {
- this.conf = conf;
+ private final Supplier<TransactionStateCache> supplier;
+
+ public TransactionStateCacheSupplier(Supplier<TransactionStateCache> supplier) {
+ this.supplier = supplier;
+ }
+
+ public TransactionStateCacheSupplier(final Configuration conf) {
+ this.supplier = new Supplier<TransactionStateCache>() {
+ @Override
+ public TransactionStateCache get() {
+ TransactionStateCache transactionStateCache = new TransactionStateCache();
+ transactionStateCache.setConf(conf);
+ return transactionStateCache;
+ }
+ };
}
/**
* Returns a singleton instance of the transaction state cache, performing lazy initialization if necessary.
- * @return A shared instance of the transaction state cache.
+ *
+ * @return A shared instance of the transaction state cache
*/
@Override
public TransactionStateCache get() {
- if (instance == null) {
- synchronized (lock) {
- if (instance == null) {
- instance = new TransactionStateCache();
- instance.setConf(conf);
- instance.start();
- }
- }
- }
- return instance;
+ return referenceCountedSupplier.getOrCreate(supplier);
+ }
+
+ @Override
+ public void release() {
+ referenceCountedSupplier.release();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index d2402a6..10ecfa4 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -18,7 +18,6 @@
package org.apache.tephra.hbase.coprocessor;
-import com.google.common.base.Supplier;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
@@ -57,6 +56,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionCodec;
import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.CacheSupplier;
import org.apache.tephra.coprocessor.TransactionStateCache;
import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
import org.apache.tephra.hbase.txprune.CompactionState;
@@ -108,6 +108,7 @@ public class TransactionProcessor extends BaseRegionObserver {
private final TransactionCodec txCodec;
private TransactionStateCache cache;
private volatile CompactionState compactionState;
+ private CacheSupplier<TransactionStateCache> cacheSupplier;
protected volatile Boolean pruneEnable;
protected volatile Long txMaxLifetimeMillis;
@@ -125,7 +126,7 @@ public class TransactionProcessor extends BaseRegionObserver {
public void start(CoprocessorEnvironment e) throws IOException {
if (e instanceof RegionCoprocessorEnvironment) {
RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
- Supplier<TransactionStateCache> cacheSupplier = getTransactionStateCacheSupplier(env);
+ this.cacheSupplier = getTransactionStateCacheSupplier(env);
this.cache = cacheSupplier.get();
HTableDescriptor tableDesc = env.getRegion().getTableDesc();
@@ -168,13 +169,19 @@ public class TransactionProcessor extends BaseRegionObserver {
return env.getConfiguration();
}
- protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
+ protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
return new TransactionStateCacheSupplier(env.getConfiguration());
}
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
- resetPruneState();
+ try {
+ resetPruneState();
+ } finally {
+ if (cacheSupplier != null) {
+ cacheSupplier.release();
+ }
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index 1c26ef1..5e0d435 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -94,6 +94,10 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
if (flushThread != null) {
flushThread.interrupt();
flushThread.join(TimeUnit.SECONDS.toMillis(1));
+ if (flushThread.isAlive()) {
+ flushThread.interrupt();
+ flushThread.join(TimeUnit.SECONDS.toMillis(1));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
index 98f3334..cb93fab 100644
--- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
+++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
@@ -20,70 +20,36 @@ package org.apache.tephra.hbase.txprune;
import com.google.common.base.Supplier;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.TableName;
+import org.apache.tephra.coprocessor.CacheSupplier;
+import org.apache.tephra.coprocessor.ReferenceCountedSupplier;
/**
* Supplies instances of {@link PruneUpperBoundWriter} implementations.
*/
-public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter> {
- private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriterSupplier.class);
+public class PruneUpperBoundWriterSupplier implements CacheSupplier<PruneUpperBoundWriter> {
- private static volatile PruneUpperBoundWriter instance;
- private static volatile int refCount = 0;
- private static final Object lock = new Object();
+ private static final ReferenceCountedSupplier<PruneUpperBoundWriter> referenceCountedSupplier =
+ new ReferenceCountedSupplier<>(PruneUpperBoundWriter.class.getSimpleName());
- private final TableName tableName;
- private final DataJanitorState dataJanitorState;
- private final long pruneFlushInterval;
+ private final Supplier<PruneUpperBoundWriter> supplier;
- public PruneUpperBoundWriterSupplier(TableName tableName, DataJanitorState dataJanitorState,
- long pruneFlushInterval) {
- this.tableName = tableName;
- this.dataJanitorState = dataJanitorState;
- this.pruneFlushInterval = pruneFlushInterval;
+ public PruneUpperBoundWriterSupplier(final TableName tableName, final DataJanitorState dataJanitorState,
+ final long pruneFlushInterval) {
+ this.supplier = new Supplier<PruneUpperBoundWriter>() {
+ @Override
+ public PruneUpperBoundWriter get() {
+ return new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
+ }
+ };
}
@Override
public PruneUpperBoundWriter get() {
- synchronized (lock) {
- if (instance == null) {
- instance = new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
- instance.startAndWait();
- }
- refCount++;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Incrementing Reference Count for PruneUpperBoundWriter " + refCount);
- }
- return instance;
- }
+ return referenceCountedSupplier.getOrCreate(supplier);
}
public void release() {
- synchronized (lock) {
- refCount--;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Decrementing Reference Count for PruneUpperBoundWriter " + refCount);
- }
-
- if (refCount == 0) {
- try {
- instance.stopAndWait();
- } catch (Exception ex) {
- LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", ex);
- } finally {
- // If the thread is still alive (might happen if the thread was blocked on HBase PUT call), interrupt it again
- if (instance.isAlive()) {
- try {
- instance.shutDown();
- } catch (Exception e) {
- LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", e);
- }
- }
- instance = null;
- }
- }
- }
+ referenceCountedSupplier.release();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index e3f5c6b..91bbc1a 100644
--- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -19,7 +19,6 @@
package org.apache.tephra.hbase.txprune;
-import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.ImmutableSortedSet;
@@ -41,6 +40,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.tephra.TransactionContext;
import org.apache.tephra.TransactionManager;
import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.CacheSupplier;
import org.apache.tephra.coprocessor.TransactionStateCache;
import org.apache.tephra.hbase.AbstractHBaseTableTest;
import org.apache.tephra.hbase.TransactionAwareHTable;
@@ -404,12 +404,17 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
private static final AtomicLong lastMajorCompactionTime = new AtomicLong(-1);
@Override
- protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
- return new Supplier<TransactionStateCache>() {
+ protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
+ return new CacheSupplier<TransactionStateCache>() {
@Override
public TransactionStateCache get() {
return new InMemoryTransactionStateCache();
}
+
+ @Override
+ public void release() {
+ // no-op
+ }
};
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 84776cf..30b69a1 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionCodec;
import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.CacheSupplier;
import org.apache.tephra.coprocessor.TransactionStateCache;
import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
import org.apache.tephra.hbase.txprune.CompactionState;
@@ -108,6 +109,7 @@ public class TransactionProcessor extends BaseRegionObserver {
private final TransactionCodec txCodec;
private TransactionStateCache cache;
private volatile CompactionState compactionState;
+ private CacheSupplier<TransactionStateCache> cacheSupplier;
protected volatile Boolean pruneEnable;
protected volatile Long txMaxLifetimeMillis;
@@ -168,13 +170,19 @@ public class TransactionProcessor extends BaseRegionObserver {
return env.getConfiguration();
}
- protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
+ protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
return new TransactionStateCacheSupplier(env.getConfiguration());
}
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
- resetPruneState();
+ try {
+ resetPruneState();
+ } finally {
+ if (cacheSupplier != null) {
+ cacheSupplier.release();
+ }
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index 1c26ef1..5e0d435 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -94,6 +94,10 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
if (flushThread != null) {
flushThread.interrupt();
flushThread.join(TimeUnit.SECONDS.toMillis(1));
+ if (flushThread.isAlive()) {
+ flushThread.interrupt();
+ flushThread.join(TimeUnit.SECONDS.toMillis(1));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
index 98f3334..cb93fab 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
@@ -20,70 +20,36 @@ package org.apache.tephra.hbase.txprune;
import com.google.common.base.Supplier;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.TableName;
+import org.apache.tephra.coprocessor.CacheSupplier;
+import org.apache.tephra.coprocessor.ReferenceCountedSupplier;
/**
* Supplies instances of {@link PruneUpperBoundWriter} implementations.
*/
-public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter> {
- private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriterSupplier.class);
+public class PruneUpperBoundWriterSupplier implements CacheSupplier<PruneUpperBoundWriter> {
- private static volatile PruneUpperBoundWriter instance;
- private static volatile int refCount = 0;
- private static final Object lock = new Object();
+ private static final ReferenceCountedSupplier<PruneUpperBoundWriter> referenceCountedSupplier =
+ new ReferenceCountedSupplier<>(PruneUpperBoundWriter.class.getSimpleName());
- private final TableName tableName;
- private final DataJanitorState dataJanitorState;
- private final long pruneFlushInterval;
+ private final Supplier<PruneUpperBoundWriter> supplier;
- public PruneUpperBoundWriterSupplier(TableName tableName, DataJanitorState dataJanitorState,
- long pruneFlushInterval) {
- this.tableName = tableName;
- this.dataJanitorState = dataJanitorState;
- this.pruneFlushInterval = pruneFlushInterval;
+ public PruneUpperBoundWriterSupplier(final TableName tableName, final DataJanitorState dataJanitorState,
+ final long pruneFlushInterval) {
+ this.supplier = new Supplier<PruneUpperBoundWriter>() {
+ @Override
+ public PruneUpperBoundWriter get() {
+ return new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
+ }
+ };
}
@Override
public PruneUpperBoundWriter get() {
- synchronized (lock) {
- if (instance == null) {
- instance = new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
- instance.startAndWait();
- }
- refCount++;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Incrementing Reference Count for PruneUpperBoundWriter " + refCount);
- }
- return instance;
- }
+ return referenceCountedSupplier.getOrCreate(supplier);
}
public void release() {
- synchronized (lock) {
- refCount--;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Decrementing Reference Count for PruneUpperBoundWriter " + refCount);
- }
-
- if (refCount == 0) {
- try {
- instance.stopAndWait();
- } catch (Exception ex) {
- LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", ex);
- } finally {
- // If the thread is still alive (might happen if the thread was blocked on HBase PUT call), interrupt it again
- if (instance.isAlive()) {
- try {
- instance.shutDown();
- } catch (Exception e) {
- LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", e);
- }
- }
- instance = null;
- }
- }
- }
+ referenceCountedSupplier.release();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index e3f5c6b..91bbc1a 100644
--- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -19,7 +19,6 @@
package org.apache.tephra.hbase.txprune;
-import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.ImmutableSortedSet;
@@ -41,6 +40,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.tephra.TransactionContext;
import org.apache.tephra.TransactionManager;
import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.CacheSupplier;
import org.apache.tephra.coprocessor.TransactionStateCache;
import org.apache.tephra.hbase.AbstractHBaseTableTest;
import org.apache.tephra.hbase.TransactionAwareHTable;
@@ -404,12 +404,17 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
private static final AtomicLong lastMajorCompactionTime = new AtomicLong(-1);
@Override
- protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
- return new Supplier<TransactionStateCache>() {
+ protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
+ return new CacheSupplier<TransactionStateCache>() {
@Override
public TransactionStateCache get() {
return new InMemoryTransactionStateCache();
}
+
+ @Override
+ public void release() {
+ // no-op
+ }
};
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index b73bdc1..ca96052 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -18,7 +18,6 @@
package org.apache.tephra.hbase.coprocessor;
-import com.google.common.base.Supplier;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
@@ -57,6 +56,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionCodec;
import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.CacheSupplier;
import org.apache.tephra.coprocessor.TransactionStateCache;
import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
import org.apache.tephra.hbase.txprune.CompactionState;
@@ -108,6 +108,7 @@ public class TransactionProcessor extends BaseRegionObserver {
private final TransactionCodec txCodec;
private TransactionStateCache cache;
private volatile CompactionState compactionState;
+ private CacheSupplier<TransactionStateCache> cacheSupplier;
protected volatile Boolean pruneEnable;
protected volatile Long txMaxLifetimeMillis;
@@ -125,7 +126,7 @@ public class TransactionProcessor extends BaseRegionObserver {
public void start(CoprocessorEnvironment e) throws IOException {
if (e instanceof RegionCoprocessorEnvironment) {
RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
- Supplier<TransactionStateCache> cacheSupplier = getTransactionStateCacheSupplier(env);
+ this.cacheSupplier = getTransactionStateCacheSupplier(env);
this.cache = cacheSupplier.get();
HTableDescriptor tableDesc = env.getRegion().getTableDesc();
@@ -168,13 +169,19 @@ public class TransactionProcessor extends BaseRegionObserver {
return env.getConfiguration();
}
- protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
+ protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
return new TransactionStateCacheSupplier(env.getConfiguration());
}
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
- resetPruneState();
+ try {
+ resetPruneState();
+ } finally {
+ if (cacheSupplier != null) {
+ cacheSupplier.release();
+ }
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index 1c26ef1..5e0d435 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -94,6 +94,10 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
if (flushThread != null) {
flushThread.interrupt();
flushThread.join(TimeUnit.SECONDS.toMillis(1));
+ if (flushThread.isAlive()) {
+ flushThread.interrupt();
+ flushThread.join(TimeUnit.SECONDS.toMillis(1));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
index 98f3334..cb93fab 100644
--- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
+++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
@@ -20,70 +20,36 @@ package org.apache.tephra.hbase.txprune;
import com.google.common.base.Supplier;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.TableName;
+import org.apache.tephra.coprocessor.CacheSupplier;
+import org.apache.tephra.coprocessor.ReferenceCountedSupplier;
/**
* Supplies instances of {@link PruneUpperBoundWriter} implementations.
*/
-public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter> {
- private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriterSupplier.class);
+public class PruneUpperBoundWriterSupplier implements CacheSupplier<PruneUpperBoundWriter> {
- private static volatile PruneUpperBoundWriter instance;
- private static volatile int refCount = 0;
- private static final Object lock = new Object();
+ private static final ReferenceCountedSupplier<PruneUpperBoundWriter> referenceCountedSupplier =
+ new ReferenceCountedSupplier<>(PruneUpperBoundWriter.class.getSimpleName());
- private final TableName tableName;
- private final DataJanitorState dataJanitorState;
- private final long pruneFlushInterval;
+ private final Supplier<PruneUpperBoundWriter> supplier;
- public PruneUpperBoundWriterSupplier(TableName tableName, DataJanitorState dataJanitorState,
- long pruneFlushInterval) {
- this.tableName = tableName;
- this.dataJanitorState = dataJanitorState;
- this.pruneFlushInterval = pruneFlushInterval;
+ public PruneUpperBoundWriterSupplier(final TableName tableName, final DataJanitorState dataJanitorState,
+ final long pruneFlushInterval) {
+ this.supplier = new Supplier<PruneUpperBoundWriter>() {
+ @Override
+ public PruneUpperBoundWriter get() {
+ return new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
+ }
+ };
}
@Override
public PruneUpperBoundWriter get() {
- synchronized (lock) {
- if (instance == null) {
- instance = new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
- instance.startAndWait();
- }
- refCount++;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Incrementing Reference Count for PruneUpperBoundWriter " + refCount);
- }
- return instance;
- }
+ return referenceCountedSupplier.getOrCreate(supplier);
}
public void release() {
- synchronized (lock) {
- refCount--;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Decrementing Reference Count for PruneUpperBoundWriter " + refCount);
- }
-
- if (refCount == 0) {
- try {
- instance.stopAndWait();
- } catch (Exception ex) {
- LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", ex);
- } finally {
- // If the thread is still alive (might happen if the thread was blocked on HBase PUT call), interrupt it again
- if (instance.isAlive()) {
- try {
- instance.shutDown();
- } catch (Exception e) {
- LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", e);
- }
- }
- instance = null;
- }
- }
- }
+ referenceCountedSupplier.release();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index c99904b..f2c1abc 100644
--- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -19,7 +19,6 @@
package org.apache.tephra.hbase.txprune;
-import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.ImmutableSortedSet;
@@ -39,6 +38,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.tephra.TransactionContext;
import org.apache.tephra.TransactionManager;
import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.CacheSupplier;
import org.apache.tephra.coprocessor.TransactionStateCache;
import org.apache.tephra.hbase.AbstractHBaseTableTest;
import org.apache.tephra.hbase.TransactionAwareHTable;
@@ -400,12 +400,17 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
private static final AtomicLong lastMajorCompactionTime = new AtomicLong(-1);
@Override
- protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
- return new Supplier<TransactionStateCache>() {
+ protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
+ return new CacheSupplier<TransactionStateCache>() {
@Override
public TransactionStateCache get() {
return new InMemoryTransactionStateCache();
}
+
+ @Override
+ public void release() {
+ // no-op
+ }
};
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index f9bb35e..263ee98 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -18,7 +18,6 @@
package org.apache.tephra.hbase.coprocessor;
-import com.google.common.base.Supplier;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
@@ -57,6 +56,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionCodec;
import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.CacheSupplier;
import org.apache.tephra.coprocessor.TransactionStateCache;
import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
import org.apache.tephra.hbase.txprune.CompactionState;
@@ -108,6 +108,7 @@ public class TransactionProcessor extends BaseRegionObserver {
private final TransactionCodec txCodec;
private TransactionStateCache cache;
private volatile CompactionState compactionState;
+ private CacheSupplier<TransactionStateCache> cacheSupplier;
protected volatile Boolean pruneEnable;
protected volatile Long txMaxLifetimeMillis;
@@ -125,7 +126,7 @@ public class TransactionProcessor extends BaseRegionObserver {
public void start(CoprocessorEnvironment e) throws IOException {
if (e instanceof RegionCoprocessorEnvironment) {
RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
- Supplier<TransactionStateCache> cacheSupplier = getTransactionStateCacheSupplier(env);
+ this.cacheSupplier = getTransactionStateCacheSupplier(env);
this.cache = cacheSupplier.get();
HTableDescriptor tableDesc = env.getRegion().getTableDesc();
@@ -168,13 +169,19 @@ public class TransactionProcessor extends BaseRegionObserver {
return env.getConfiguration();
}
- protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
+ protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
return new TransactionStateCacheSupplier(env.getConfiguration());
}
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
- resetPruneState();
+ try {
+ resetPruneState();
+ } finally {
+ if (cacheSupplier != null) {
+ cacheSupplier.release();
+ }
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index 1c26ef1..5e0d435 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -94,6 +94,10 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
if (flushThread != null) {
flushThread.interrupt();
flushThread.join(TimeUnit.SECONDS.toMillis(1));
+ if (flushThread.isAlive()) {
+ flushThread.interrupt();
+ flushThread.join(TimeUnit.SECONDS.toMillis(1));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
index 98f3334..cb93fab 100644
--- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
+++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
@@ -20,70 +20,36 @@ package org.apache.tephra.hbase.txprune;
import com.google.common.base.Supplier;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.TableName;
+import org.apache.tephra.coprocessor.CacheSupplier;
+import org.apache.tephra.coprocessor.ReferenceCountedSupplier;
/**
* Supplies instances of {@link PruneUpperBoundWriter} implementations.
*/
-public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter> {
- private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriterSupplier.class);
+public class PruneUpperBoundWriterSupplier implements CacheSupplier<PruneUpperBoundWriter> {
- private static volatile PruneUpperBoundWriter instance;
- private static volatile int refCount = 0;
- private static final Object lock = new Object();
+ private static final ReferenceCountedSupplier<PruneUpperBoundWriter> referenceCountedSupplier =
+ new ReferenceCountedSupplier<>(PruneUpperBoundWriter.class.getSimpleName());
- private final TableName tableName;
- private final DataJanitorState dataJanitorState;
- private final long pruneFlushInterval;
+ private final Supplier<PruneUpperBoundWriter> supplier;
- public PruneUpperBoundWriterSupplier(TableName tableName, DataJanitorState dataJanitorState,
- long pruneFlushInterval) {
- this.tableName = tableName;
- this.dataJanitorState = dataJanitorState;
- this.pruneFlushInterval = pruneFlushInterval;
+ public PruneUpperBoundWriterSupplier(final TableName tableName, final DataJanitorState dataJanitorState,
+ final long pruneFlushInterval) {
+ this.supplier = new Supplier<PruneUpperBoundWriter>() {
+ @Override
+ public PruneUpperBoundWriter get() {
+ return new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
+ }
+ };
}
@Override
public PruneUpperBoundWriter get() {
- synchronized (lock) {
- if (instance == null) {
- instance = new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
- instance.startAndWait();
- }
- refCount++;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Incrementing Reference Count for PruneUpperBoundWriter " + refCount);
- }
- return instance;
- }
+ return referenceCountedSupplier.getOrCreate(supplier);
}
public void release() {
- synchronized (lock) {
- refCount--;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Decrementing Reference Count for PruneUpperBoundWriter " + refCount);
- }
-
- if (refCount == 0) {
- try {
- instance.stopAndWait();
- } catch (Exception ex) {
- LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", ex);
- } finally {
- // If the thread is still alive (might happen if the thread was blocked on HBase PUT call), interrupt it again
- if (instance.isAlive()) {
- try {
- instance.shutDown();
- } catch (Exception e) {
- LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", e);
- }
- }
- instance = null;
- }
- }
- }
+ referenceCountedSupplier.release();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index 07746d8..ac5e923 100644
--- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -19,7 +19,6 @@
package org.apache.tephra.hbase.txprune;
-import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.ImmutableSortedSet;
@@ -39,6 +38,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.tephra.TransactionContext;
import org.apache.tephra.TransactionManager;
import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.CacheSupplier;
import org.apache.tephra.coprocessor.TransactionStateCache;
import org.apache.tephra.hbase.AbstractHBaseTableTest;
import org.apache.tephra.hbase.TransactionAwareHTable;
@@ -400,12 +400,17 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
private static final AtomicLong lastMajorCompactionTime = new AtomicLong(-1);
@Override
- protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
- return new Supplier<TransactionStateCache>() {
+ protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
+ return new CacheSupplier<TransactionStateCache>() {
@Override
public TransactionStateCache get() {
return new InMemoryTransactionStateCache();
}
+
+ @Override
+ public void release() {
+ // no-op
+ }
};
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 02e2dac..553f598 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -18,7 +18,6 @@
package org.apache.tephra.hbase.coprocessor;
-import com.google.common.base.Supplier;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
@@ -57,6 +56,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.tephra.Transaction;
import org.apache.tephra.TransactionCodec;
import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.CacheSupplier;
import org.apache.tephra.coprocessor.TransactionStateCache;
import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
import org.apache.tephra.hbase.txprune.CompactionState;
@@ -108,6 +108,7 @@ public class TransactionProcessor extends BaseRegionObserver {
private final TransactionCodec txCodec;
private TransactionStateCache cache;
private volatile CompactionState compactionState;
+ private CacheSupplier<TransactionStateCache> cacheSupplier;
protected volatile Boolean pruneEnable;
protected volatile Long txMaxLifetimeMillis;
@@ -125,7 +126,7 @@ public class TransactionProcessor extends BaseRegionObserver {
public void start(CoprocessorEnvironment e) throws IOException {
if (e instanceof RegionCoprocessorEnvironment) {
RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
- Supplier<TransactionStateCache> cacheSupplier = getTransactionStateCacheSupplier(env);
+ this.cacheSupplier = getTransactionStateCacheSupplier(env);
this.cache = cacheSupplier.get();
HTableDescriptor tableDesc = env.getRegion().getTableDesc();
@@ -168,13 +169,19 @@ public class TransactionProcessor extends BaseRegionObserver {
return env.getConfiguration();
}
- protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
+ protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
return new TransactionStateCacheSupplier(env.getConfiguration());
}
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
- resetPruneState();
+ try {
+ resetPruneState();
+ } finally {
+ if (cacheSupplier != null) {
+ cacheSupplier.release();
+ }
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
index 6bd8bab..677710b 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -94,6 +94,10 @@ public class PruneUpperBoundWriter extends AbstractIdleService {
if (flushThread != null) {
flushThread.interrupt();
flushThread.join(TimeUnit.SECONDS.toMillis(1));
+ if (flushThread.isAlive()) {
+ flushThread.interrupt();
+ flushThread.join(TimeUnit.SECONDS.toMillis(1));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
index 98f3334..cb93fab 100644
--- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
+++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java
@@ -20,70 +20,36 @@ package org.apache.tephra.hbase.txprune;
import com.google.common.base.Supplier;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.TableName;
+import org.apache.tephra.coprocessor.CacheSupplier;
+import org.apache.tephra.coprocessor.ReferenceCountedSupplier;
/**
* Supplies instances of {@link PruneUpperBoundWriter} implementations.
*/
-public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter> {
- private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriterSupplier.class);
+public class PruneUpperBoundWriterSupplier implements CacheSupplier<PruneUpperBoundWriter> {
- private static volatile PruneUpperBoundWriter instance;
- private static volatile int refCount = 0;
- private static final Object lock = new Object();
+ private static final ReferenceCountedSupplier<PruneUpperBoundWriter> referenceCountedSupplier =
+ new ReferenceCountedSupplier<>(PruneUpperBoundWriter.class.getSimpleName());
- private final TableName tableName;
- private final DataJanitorState dataJanitorState;
- private final long pruneFlushInterval;
+ private final Supplier<PruneUpperBoundWriter> supplier;
- public PruneUpperBoundWriterSupplier(TableName tableName, DataJanitorState dataJanitorState,
- long pruneFlushInterval) {
- this.tableName = tableName;
- this.dataJanitorState = dataJanitorState;
- this.pruneFlushInterval = pruneFlushInterval;
+ public PruneUpperBoundWriterSupplier(final TableName tableName, final DataJanitorState dataJanitorState,
+ final long pruneFlushInterval) {
+ this.supplier = new Supplier<PruneUpperBoundWriter>() {
+ @Override
+ public PruneUpperBoundWriter get() {
+ return new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
+ }
+ };
}
@Override
public PruneUpperBoundWriter get() {
- synchronized (lock) {
- if (instance == null) {
- instance = new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval);
- instance.startAndWait();
- }
- refCount++;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Incrementing Reference Count for PruneUpperBoundWriter " + refCount);
- }
- return instance;
- }
+ return referenceCountedSupplier.getOrCreate(supplier);
}
public void release() {
- synchronized (lock) {
- refCount--;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Decrementing Reference Count for PruneUpperBoundWriter " + refCount);
- }
-
- if (refCount == 0) {
- try {
- instance.stopAndWait();
- } catch (Exception ex) {
- LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", ex);
- } finally {
- // If the thread is still alive (might happen if the thread was blocked on HBase PUT call), interrupt it again
- if (instance.isAlive()) {
- try {
- instance.shutDown();
- } catch (Exception e) {
- LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", e);
- }
- }
- instance = null;
- }
- }
- }
+ referenceCountedSupplier.release();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index 07746d8..ac5e923 100644
--- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -19,7 +19,6 @@
package org.apache.tephra.hbase.txprune;
-import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.ImmutableSortedSet;
@@ -39,6 +38,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.tephra.TransactionContext;
import org.apache.tephra.TransactionManager;
import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.CacheSupplier;
import org.apache.tephra.coprocessor.TransactionStateCache;
import org.apache.tephra.hbase.AbstractHBaseTableTest;
import org.apache.tephra.hbase.TransactionAwareHTable;
@@ -400,12 +400,17 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
private static final AtomicLong lastMajorCompactionTime = new AtomicLong(-1);
@Override
- protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
- return new Supplier<TransactionStateCache>() {
+ protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) {
+ return new CacheSupplier<TransactionStateCache>() {
@Override
public TransactionStateCache get() {
return new InMemoryTransactionStateCache();
}
+
+ @Override
+ public void release() {
+ // no-op
+ }
};
}