You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by an...@apache.org on 2016/10/03 13:25:08 UTC
phoenix git commit: PHOENIX-3159 CachingHTableFactory may close
HTable during eviction even if it is getting used for writing by another
thread
Repository: phoenix
Updated Branches:
refs/heads/master 2f51568a7 -> 4c0aeb0d5
PHOENIX-3159 CachingHTableFactory may close HTable during eviction even if it is getting used for writing by another thread
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4c0aeb0d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4c0aeb0d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4c0aeb0d
Branch: refs/heads/master
Commit: 4c0aeb0d530852bc12e5fcd930e336fb19434397
Parents: 2f51568
Author: Ankit Singhal <an...@gmail.com>
Authored: Mon Oct 3 18:54:51 2016 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Mon Oct 3 18:54:51 2016 +0530
----------------------------------------------------------------------
.../hbase/index/table/CachingHTableFactory.java | 104 ++++++++++++++++---
.../index/table/CoprocessorHTableFactory.java | 6 ++
.../hbase/index/table/HTableFactory.java | 4 +-
.../hbase/index/write/IndexWriterUtils.java | 3 +
.../write/ParallelWriterIndexCommitter.java | 21 ++--
.../TrackingParallelWriterIndexCommitter.java | 18 ++--
.../hbase/index/write/FakeTableFactory.java | 9 +-
.../index/write/TestCachingHTableFactory.java | 37 ++++---
.../hbase/index/write/TestIndexWriter.java | 24 +++--
.../index/write/TestParalleIndexWriter.java | 16 ++-
.../write/TestParalleWriterIndexCommitter.java | 15 ++-
11 files changed, 197 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c0aeb0d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CachingHTableFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CachingHTableFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CachingHTableFactory.java
index 0c06e2b..d0df5b3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CachingHTableFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CachingHTableFactory.java
@@ -17,18 +17,30 @@
*/
package org.apache.phoenix.hbase.index.table;
+import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.HTABLE_KEEP_ALIVE_KEY;
+import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITES_THREAD_MAX_PER_REGIONSERVER_KEY;
+
import java.io.IOException;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.collections.map.LRUMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
-
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.phoenix.execute.DelegateHTable;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import com.google.common.annotations.VisibleForTesting;;
+
/**
* A simple cache that just uses usual GC mechanisms to cleanup unused {@link HTableInterface}s.
* When requesting an {@link HTableInterface} via {@link #getTable}, you may get the same table as
@@ -47,7 +59,7 @@ public class CachingHTableFactory implements HTableFactory {
public class HTableInterfaceLRUMap extends LRUMap {
public HTableInterfaceLRUMap(int cacheSize) {
- super(cacheSize);
+ super(cacheSize, true);
}
@Override
@@ -58,12 +70,18 @@ public class CachingHTableFactory implements HTableFactory {
+ " because it was evicted from the cache.");
}
try {
- table.close();
+ synchronized (this) { // the whole operation of closing and checking the count should be atomic
+ // and should not conflict with getTable()
+ if (((CachedHTableWrapper)table).getReferenceCount() <= 0) {
+ table.close();
+ return true;
+ }
+ }
} catch (IOException e) {
LOG.info("Failed to correctly close HTable: " + Bytes.toString(table.getTableName())
+ " ignoring since being removed from queue.");
}
- return true;
+ return false;
}
}
@@ -73,38 +91,94 @@ public class CachingHTableFactory implements HTableFactory {
private static final Log LOG = LogFactory.getLog(CachingHTableFactory.class);
private static final String CACHE_SIZE_KEY = "index.tablefactory.cache.size";
- private static final int DEFAULT_CACHE_SIZE = 10;
+ private static final int DEFAULT_CACHE_SIZE = 1000;
private HTableFactory delegate;
@SuppressWarnings("rawtypes")
Map openTables;
+ private ThreadPoolExecutor pool;
- public CachingHTableFactory(HTableFactory tableFactory, Configuration conf) {
- this(tableFactory, getCacheSize(conf));
+ public CachingHTableFactory(HTableFactory tableFactory, Configuration conf, RegionCoprocessorEnvironment env) {
+ this(tableFactory, getCacheSize(conf), env);
}
- public CachingHTableFactory(HTableFactory factory, int cacheSize) {
+ public CachingHTableFactory(HTableFactory factory, int cacheSize, RegionCoprocessorEnvironment env) {
this.delegate = factory;
openTables = new HTableInterfaceLRUMap(cacheSize);
+ this.pool = new ThreadPoolExecutor(1,
+ env.getConfiguration().getInt(INDEX_WRITES_THREAD_MAX_PER_REGIONSERVER_KEY, Integer.MAX_VALUE),
+ env.getConfiguration().getInt(HTABLE_KEEP_ALIVE_KEY, 60), TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("CachedHtables"));
+ pool.allowCoreThreadTimeOut(true);
}
-
+
@Override
@SuppressWarnings("unchecked")
- public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException {
+ public HTableInterface getTable(ImmutableBytesPtr tablename, ExecutorService pool) throws IOException {
ImmutableBytesPtr tableBytes = new ImmutableBytesPtr(tablename);
synchronized (openTables) {
- HTableInterface table = (HTableInterface) openTables.get(tableBytes);
+ CachedHTableWrapper table = (CachedHTableWrapper) openTables.get(tableBytes);
if (table == null) {
- table = delegate.getTable(tablename);
+ table = new CachedHTableWrapper(delegate.getTable(tablename, pool));
openTables.put(tableBytes, table);
}
+ table.incrementReferenceCount();
return table;
}
}
@Override
- public void shutdown() {
- this.delegate.shutdown();
- }
+ public void shutdown() {
+ this.delegate.shutdown();
+ this.pool.shutdown();
+ try {
+ boolean terminated = false;
+ do {
+ // wait until the pool has terminated
+ terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
+ } while (!terminated);
+ } catch (InterruptedException e) {
+ this.pool.shutdownNow();
+ LOG.warn("waitForTermination interrupted");
+ }
+ }
+
+ public static class CachedHTableWrapper extends DelegateHTable {
+
+ private AtomicInteger referenceCount = new AtomicInteger();
+
+ public CachedHTableWrapper(HTableInterface table) {
+ super(table);
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if (getReferenceCount() > 0) {
+ this.referenceCount.decrementAndGet();
+ } else {
+ // During LRU eviction
+ super.close();
+ }
+ }
+
+ public void incrementReferenceCount() {
+ this.referenceCount.incrementAndGet();
+ }
+
+ public int getReferenceCount() {
+ return this.referenceCount.get();
+ }
+
+ }
+
+ @Override
+ public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException {
+ return getTable(tablename, this.pool);
+ }
+
+ @VisibleForTesting
+ public ThreadPoolExecutor getPool(){
+ return this.pool;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c0aeb0d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
index ded618d..45e271d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.hbase.index.table;
import java.io.IOException;
+import java.util.concurrent.ExecutorService;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.TableName;
@@ -36,6 +37,11 @@ public class CoprocessorHTableFactory implements HTableFactory {
public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException {
return this.e.getTable(TableName.valueOf(tablename.copyBytesIfNecessary()));
}
+
+ @Override
+ public HTableInterface getTable(ImmutableBytesPtr tablename,ExecutorService pool) throws IOException {
+ return this.e.getTable(TableName.valueOf(tablename.copyBytesIfNecessary()), pool);
+ }
@Override
public void shutdown() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c0aeb0d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableFactory.java
index bef3d34..e6a2e60 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableFactory.java
@@ -19,9 +19,9 @@
package org.apache.phoenix.hbase.index.table;
import java.io.IOException;
+import java.util.concurrent.ExecutorService;
import org.apache.hadoop.hbase.client.HTableInterface;
-
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
public interface HTableFactory {
@@ -29,4 +29,6 @@ public interface HTableFactory {
public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException;
public void shutdown();
+
+ public HTableInterface getTable(ImmutableBytesPtr tablename, ExecutorService pool) throws IOException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c0aeb0d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
index b8b0079..6eb657d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
@@ -54,6 +54,9 @@ public class IndexWriterUtils {
/** Configuration key that HBase uses to set the max number of threads for an HTable */
public static final String HTABLE_THREAD_KEY = "hbase.htable.threads.max";
+ public static final String INDEX_WRITES_THREAD_MAX_PER_REGIONSERVER_KEY = "phoenix.index.writes.threads.max";
+ public static final String HTABLE_KEEP_ALIVE_KEY = "hbase.htable.threads.keepalivetime";
+
private IndexWriterUtils() {
// private ctor for utilites
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c0aeb0d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
index dd30db5..1549d26 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
@@ -21,14 +21,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
import org.apache.phoenix.hbase.index.parallel.QuickFailingTaskRunner;
@@ -41,7 +37,6 @@ import org.apache.phoenix.hbase.index.table.HTableFactory;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.MetaDataUtil;
import com.google.common.collect.Multimap;
@@ -59,7 +54,7 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.writer.threads.max";
private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
- private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY = "index.writer.threads.keepalivetime";
+ public static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY = "index.writer.threads.keepalivetime";
private static final Log LOG = LogFactory.getLog(ParallelWriterIndexCommitter.class);
private HTableFactory factory;
@@ -84,7 +79,7 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
new ThreadPoolBuilder(name, conf).setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout(
INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), env.getRegionServerServices(), parent,
- CachingHTableFactory.getCacheSize(conf));
+ CachingHTableFactory.getCacheSize(conf),env);
this.kvBuilder = KeyValueBuilder.get(env.getHBaseVersion());
}
@@ -93,8 +88,8 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
* <p>
* Exposed for TESTING
*/
- void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop, int cacheSize) {
- this.factory = new CachingHTableFactory(factory, cacheSize);
+ void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop, int cacheSize, RegionCoprocessorEnvironment env) {
+ this.factory = new CachingHTableFactory(factory, cacheSize, env);
this.pool = new QuickFailingTaskRunner(pool);
this.stopped = stop;
}
@@ -151,6 +146,7 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
if (LOG.isDebugEnabled()) {
LOG.debug("Writing index update:" + mutations + " to table: " + tableReference);
}
+ HTableInterface table = null;
try {
if (allowLocalUpdates
&& env != null
@@ -168,7 +164,7 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
}
}
}
- HTableInterface table = factory.getTable(tableReference.get());
+ table = factory.getTable(tableReference.get());
throwFailureIfDone();
table.batch(mutations);
} catch (SingleIndexWriteFailureException e) {
@@ -180,6 +176,11 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
Thread.currentThread().interrupt();
throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e);
}
+ finally{
+ if (table != null) {
+ table.close();
+ }
+ }
return null;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c0aeb0d/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
index f3888ed..4f1a076 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
@@ -23,13 +23,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.phoenix.hbase.index.CapturingAbortable;
import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException;
import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
@@ -48,7 +45,6 @@ import org.apache.phoenix.hbase.index.write.IndexWriter;
import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
import org.apache.phoenix.hbase.index.write.ParallelWriterIndexCommitter;
import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.MetaDataUtil;
import com.google.common.collect.Multimap;
@@ -95,7 +91,7 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
new ThreadPoolBuilder(name, conf).setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout(
INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), env.getRegionServerServices(), parent,
- CachingHTableFactory.getCacheSize(conf));
+ CachingHTableFactory.getCacheSize(conf), env);
}
/**
@@ -103,9 +99,10 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
* <p>
* Exposed for TESTING
*/
- void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop, int cacheSize) {
+ void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop, int cacheSize,
+ RegionCoprocessorEnvironment env) {
this.pool = new WaitForCompletionTaskRunner(pool);
- this.factory = new CachingHTableFactory(factory, cacheSize);
+ this.factory = new CachingHTableFactory(factory, cacheSize, env);
this.abortable = new CapturingAbortable(abortable);
this.stopped = stop;
}
@@ -148,6 +145,7 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
@SuppressWarnings("deprecation")
@Override
public Boolean call() throws Exception {
+ HTableInterface table = null;
try {
// this may have been queued, but there was an abort/stop so we try to early exit
throwFailureIfDone();
@@ -172,7 +170,7 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
LOG.debug("Writing index update:" + mutations + " to table: " + tableReference);
}
- HTableInterface table = factory.getTable(tableReference.get());
+ table = factory.getTable(tableReference.get());
throwFailureIfDone();
table.batch(mutations);
} catch (InterruptedException e) {
@@ -181,6 +179,10 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
throw e;
} catch (Exception e) {
throw e;
+ } finally {
+ if (table != null) {
+ table.close();
+ }
}
return Boolean.TRUE;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c0aeb0d/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/FakeTableFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/FakeTableFactory.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/FakeTableFactory.java
index 2b6be18..4483a7f 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/FakeTableFactory.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/FakeTableFactory.java
@@ -20,9 +20,9 @@ package org.apache.phoenix.hbase.index.write;
import java.io.IOException;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import org.apache.hadoop.hbase.client.HTableInterface;
-
import org.apache.phoenix.hbase.index.table.HTableFactory;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -41,11 +41,16 @@ class FakeTableFactory implements HTableFactory {
@Override
public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException {
- return this.tables.get(tablename);
+ return getTable(tablename, null);
}
@Override
public void shutdown() {
shutdown = true;
}
+
+ @Override
+ public HTableInterface getTable(ImmutableBytesPtr tablename, ExecutorService pool) throws IOException {
+ return this.tables.get(tablename);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c0aeb0d/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestCachingHTableFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestCachingHTableFactory.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestCachingHTableFactory.java
index adf82f3..93ac3a6 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestCachingHTableFactory.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestCachingHTableFactory.java
@@ -17,14 +17,18 @@
*/
package org.apache.phoenix.hbase.index.write;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
-import org.mockito.Mockito;
-
import org.apache.phoenix.hbase.index.table.CachingHTableFactory;
import org.apache.phoenix.hbase.index.table.HTableFactory;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.junit.Test;
+import org.mockito.Mockito;
public class TestCachingHTableFactory {
@@ -32,27 +36,34 @@ public class TestCachingHTableFactory {
public void testCacheCorrectlyExpiresTable() throws Exception {
// setup the mocks for the tables we will request
HTableFactory delegate = Mockito.mock(HTableFactory.class);
+ RegionCoprocessorEnvironment e =Mockito.mock(RegionCoprocessorEnvironment.class);
+ Configuration conf =new Configuration();
+ Mockito.when(e.getConfiguration()).thenReturn(conf);
+ Mockito.when(e.getSharedData()).thenReturn(new ConcurrentHashMap<String,Object>());
ImmutableBytesPtr t1 = new ImmutableBytesPtr(Bytes.toBytes("t1"));
ImmutableBytesPtr t2 = new ImmutableBytesPtr(Bytes.toBytes("t2"));
ImmutableBytesPtr t3 = new ImmutableBytesPtr(Bytes.toBytes("t3"));
HTableInterface table1 = Mockito.mock(HTableInterface.class);
HTableInterface table2 = Mockito.mock(HTableInterface.class);
HTableInterface table3 = Mockito.mock(HTableInterface.class);
- Mockito.when(delegate.getTable(t1)).thenReturn(table1);
- Mockito.when(delegate.getTable(t2)).thenReturn(table2);
- Mockito.when(delegate.getTable(t3)).thenReturn(table3);
+
// setup our factory with a cache size of 2
- CachingHTableFactory factory = new CachingHTableFactory(delegate, 2);
- factory.getTable(t1);
- factory.getTable(t2);
- factory.getTable(t3);
+ CachingHTableFactory factory = new CachingHTableFactory(delegate, 2, e);
+ Mockito.when(delegate.getTable(t1,factory.getPool())).thenReturn(table1);
+ Mockito.when(delegate.getTable(t2,factory.getPool())).thenReturn(table2);
+ Mockito.when(delegate.getTable(t3,factory.getPool())).thenReturn(table3);
+
+ HTableInterface ft1 =factory.getTable(t1);
+ HTableInterface ft2 =factory.getTable(t2);
+ ft1.close();
+ HTableInterface ft3 = factory.getTable(t3);
// get the same table a second time, after it has gone out of cache
factory.getTable(t1);
- Mockito.verify(delegate, Mockito.times(2)).getTable(t1);
- Mockito.verify(delegate, Mockito.times(1)).getTable(t2);
- Mockito.verify(delegate, Mockito.times(1)).getTable(t3);
+ Mockito.verify(delegate, Mockito.times(2)).getTable(t1,factory.getPool());
+ Mockito.verify(delegate, Mockito.times(1)).getTable(t2,factory.getPool());
+ Mockito.verify(delegate, Mockito.times(1)).getTable(t3,factory.getPool());
Mockito.verify(table1).close();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c0aeb0d/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
index 8f576cf..76ea933 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
@@ -29,6 +29,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -90,6 +91,10 @@ public class TestIndexWriter {
LOG.info("Current thread is interrupted: " + Thread.interrupted());
Abortable abort = new StubAbortable();
Stoppable stop = Mockito.mock(Stoppable.class);
+ RegionCoprocessorEnvironment e =Mockito.mock(RegionCoprocessorEnvironment.class);
+ Configuration conf =new Configuration();
+ Mockito.when(e.getConfiguration()).thenReturn(conf);
+ Mockito.when(e.getSharedData()).thenReturn(new ConcurrentHashMap<String,Object>());
ExecutorService exec = Executors.newFixedThreadPool(1);
Map<ImmutableBytesPtr, HTableInterface> tables = new HashMap<ImmutableBytesPtr, HTableInterface>();
FakeTableFactory factory = new FakeTableFactory(tables);
@@ -117,7 +122,7 @@ public class TestIndexWriter {
// setup the writer and failure policy
ParallelWriterIndexCommitter committer = new ParallelWriterIndexCommitter(VersionInfo.getVersion());
- committer.setup(factory, exec, abort, stop, 2);
+ committer.setup(factory, exec, abort, stop, 2, e);
KillServerOnFailurePolicy policy = new KillServerOnFailurePolicy();
policy.setup(stop, abort);
IndexWriter writer = new IndexWriter(committer, policy);
@@ -164,7 +169,10 @@ public class TestIndexWriter {
Mockito.when(table.batch(Mockito.anyList())).thenThrow(
new IOException("Intentional IOException for failed first write."));
Mockito.when(table.getTableName()).thenReturn(tableName);
-
+ RegionCoprocessorEnvironment e =Mockito.mock(RegionCoprocessorEnvironment.class);
+ Configuration conf =new Configuration();
+ Mockito.when(e.getConfiguration()).thenReturn(conf);
+ Mockito.when(e.getSharedData()).thenReturn(new ConcurrentHashMap<String,Object>());
// second table just blocks to make sure that the abort propagates to the third task
final CountDownLatch waitOnAbortedLatch = new CountDownLatch(1);
final boolean[] failed = new boolean[] { false };
@@ -190,15 +198,15 @@ public class TestIndexWriter {
tables.put(new ImmutableBytesPtr(tableName2), table2);
ParallelWriterIndexCommitter committer = new ParallelWriterIndexCommitter(VersionInfo.getVersion());
- committer.setup(factory, exec, abort, stop, 2);
+ committer.setup(factory, exec, abort, stop, 2, e);
KillServerOnFailurePolicy policy = new KillServerOnFailurePolicy();
policy.setup(stop, abort);
IndexWriter writer = new IndexWriter(committer, policy);
try {
writer.write(indexUpdates);
fail("Should not have successfully completed all index writes");
- } catch (SingleIndexWriteFailureException e) {
- LOG.info("Correctly got a failure to reach the index", e);
+ } catch (SingleIndexWriteFailureException s) {
+ LOG.info("Correctly got a failure to reach the index", s);
// should have correctly gotten the correct abort, so let the next task execute
waitOnAbortedLatch.countDown();
}
@@ -223,6 +231,10 @@ public class TestIndexWriter {
// single thread factory so the older request gets queued
ExecutorService exec = Executors.newFixedThreadPool(1);
Map<ImmutableBytesPtr, HTableInterface> tables = new HashMap<ImmutableBytesPtr, HTableInterface>();
+ RegionCoprocessorEnvironment e =Mockito.mock(RegionCoprocessorEnvironment.class);
+ Configuration conf =new Configuration();
+ Mockito.when(e.getConfiguration()).thenReturn(conf);
+ Mockito.when(e.getSharedData()).thenReturn(new ConcurrentHashMap<String,Object>());
FakeTableFactory factory = new FakeTableFactory(tables);
byte[] tableName = this.testName.getTableName();
@@ -257,7 +269,7 @@ public class TestIndexWriter {
// setup the writer
ParallelWriterIndexCommitter committer = new ParallelWriterIndexCommitter(VersionInfo.getVersion());
- committer.setup(factory, exec, abort, stop, 2);
+ committer.setup(factory, exec, abort, stop, 2, e );
KillServerOnFailurePolicy policy = new KillServerOnFailurePolicy();
policy.setup(stop, abort);
final IndexWriter writer = new IndexWriter(committer, policy);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c0aeb0d/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java
index 1f1e37e..ab88cd2 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java
@@ -22,16 +22,19 @@ import static org.junit.Assert.assertTrue;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.phoenix.hbase.index.StubAbortable;
@@ -57,13 +60,17 @@ public class TestParalleIndexWriter {
@Test
public void testCorrectlyCleansUpResources() throws Exception{
ExecutorService exec = Executors.newFixedThreadPool(1);
+ RegionCoprocessorEnvironment e =Mockito.mock(RegionCoprocessorEnvironment.class);
+ Configuration conf =new Configuration();
+ Mockito.when(e.getConfiguration()).thenReturn(conf);
+ Mockito.when(e.getSharedData()).thenReturn(new ConcurrentHashMap<String,Object>());
FakeTableFactory factory = new FakeTableFactory(
Collections.<ImmutableBytesPtr, HTableInterface> emptyMap());
ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter(VersionInfo.getVersion());
Abortable mockAbort = Mockito.mock(Abortable.class);
Stoppable mockStop = Mockito.mock(Stoppable.class);
// create a simple writer
- writer.setup(factory, exec, mockAbort, mockStop, 1);
+ writer.setup(factory, exec, mockAbort, mockStop, 1,e);
// stop the writer
writer.stop(this.test.getTableNameString() + " finished");
assertTrue("Factory didn't get shutdown after writer#stop!", factory.shutdown);
@@ -82,7 +89,10 @@ public class TestParalleIndexWriter {
Map<ImmutableBytesPtr, HTableInterface> tables =
new HashMap<ImmutableBytesPtr, HTableInterface>();
FakeTableFactory factory = new FakeTableFactory(tables);
-
+ RegionCoprocessorEnvironment e =Mockito.mock(RegionCoprocessorEnvironment.class);
+ Configuration conf =new Configuration();
+ Mockito.when(e.getConfiguration()).thenReturn(conf);
+ Mockito.when(e.getSharedData()).thenReturn(new ConcurrentHashMap<String,Object>());
ImmutableBytesPtr tableName = new ImmutableBytesPtr(this.test.getTableName());
Put m = new Put(row);
m.add(Bytes.toBytes("family"), Bytes.toBytes("qual"), null);
@@ -107,7 +117,7 @@ public class TestParalleIndexWriter {
// setup the writer and failure policy
ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter(VersionInfo.getVersion());
- writer.setup(factory, exec, abort, stop, 1);
+ writer.setup(factory, exec, abort, stop, 1, e);
writer.write(indexUpdates, true);
assertTrue("Writer returned before the table batch completed! Likely a race condition tripped",
completed[0]);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4c0aeb0d/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java
index 8eece3b..219f615 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java
@@ -22,16 +22,19 @@ import static org.junit.Assert.assertTrue;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.phoenix.hbase.index.StubAbortable;
@@ -62,8 +65,12 @@ public class TestParalleWriterIndexCommitter {
ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter(VersionInfo.getVersion());
Abortable mockAbort = Mockito.mock(Abortable.class);
Stoppable mockStop = Mockito.mock(Stoppable.class);
+ RegionCoprocessorEnvironment e =Mockito.mock(RegionCoprocessorEnvironment.class);
+ Configuration conf =new Configuration();
+ Mockito.when(e.getConfiguration()).thenReturn(conf);
+ Mockito.when(e.getSharedData()).thenReturn(new ConcurrentHashMap<String,Object>());
// create a simple writer
- writer.setup(factory, exec, mockAbort, mockStop, 1);
+ writer.setup(factory, exec, mockAbort, mockStop, 1, e);
// stop the writer
writer.stop(this.test.getTableNameString() + " finished");
assertTrue("Factory didn't get shutdown after writer#stop!", factory.shutdown);
@@ -77,6 +84,10 @@ public class TestParalleWriterIndexCommitter {
LOG.info("Starting " + test.getTableNameString());
LOG.info("Current thread is interrupted: " + Thread.interrupted());
Abortable abort = new StubAbortable();
+ RegionCoprocessorEnvironment e =Mockito.mock(RegionCoprocessorEnvironment.class);
+ Configuration conf =new Configuration();
+ Mockito.when(e.getConfiguration()).thenReturn(conf);
+ Mockito.when(e.getSharedData()).thenReturn(new ConcurrentHashMap<String,Object>());
Stoppable stop = Mockito.mock(Stoppable.class);
ExecutorService exec = Executors.newFixedThreadPool(1);
Map<ImmutableBytesPtr, HTableInterface> tables =
@@ -107,7 +118,7 @@ public class TestParalleWriterIndexCommitter {
// setup the writer and failure policy
ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter(VersionInfo.getVersion());
- writer.setup(factory, exec, abort, stop, 1);
+ writer.setup(factory, exec, abort, stop, 1, e);
writer.write(indexUpdates, true);
assertTrue("Writer returned before the table batch completed! Likely a race condition tripped",
completed[0]);