You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/09/26 20:08:06 UTC

[1/3] phoenix git commit: PHOENIX-4230 Write index updates in postBatchMutateIndispensably for transactional tables

Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.1 b4e13730a -> 8565f3653


PHOENIX-4230 Write index updates in postBatchMutateIndispensably for transactional tables

Conflicts:

	phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/0403840c
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/0403840c
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/0403840c

Branch: refs/heads/4.x-HBase-1.1
Commit: 0403840c62ab84f83c261957d13d5ffdfd7f2c0a
Parents: b4e1373
Author: James Taylor <jt...@salesforce.com>
Authored: Mon Sep 25 18:52:48 2017 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Tue Sep 26 12:59:29 2017 -0700

----------------------------------------------------------------------
 .../index/PhoenixTransactionalIndexer.java      | 127 ++++++++++---------
 1 file changed, 64 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/0403840c/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 4b267a2..969378d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -17,6 +17,11 @@
  */
 package org.apache.phoenix.index;
 
+import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_PAUSE;
+import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER;
+import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_PAUSE;
+import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_RETRIES_NUMBER;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -29,7 +34,6 @@ import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,14 +41,12 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
@@ -55,7 +57,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.htrace.Span;
@@ -104,11 +105,18 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
 
     private static final Log LOG = LogFactory.getLog(PhoenixTransactionalIndexer.class);
 
+    // Hack to get around not being able to save any state between
+    // coprocessor calls. TODO: remove after HBASE-18127 when available
+    private static class BatchMutateContext {
+        public Collection<Pair<Mutation, byte[]>> indexUpdates = Collections.emptyList();
+    }
+    
+    private ThreadLocal<BatchMutateContext> batchMutateContext =
+            new ThreadLocal<BatchMutateContext>();
+    
     private PhoenixIndexCodec codec;
     private IndexWriter writer;
     private boolean stopped;
-    private Map<Long, Collection<Pair<Mutation, byte[]>>> localUpdates =
-            new ConcurrentHashMap<Long, Collection<Pair<Mutation, byte[]>>>();
 
     @Override
     public void start(CoprocessorEnvironment e) throws IOException {
@@ -124,9 +132,15 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
          */
         clonedConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
                 InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class);
+        // lower the number of rpc retries.  We inherit config from HConnectionManager#setServerSideHConnectionRetries,
+        // which by default uses a multiplier of 10.  That is too many retries for our synchronous index writes
+        clonedConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+            env.getConfiguration().getInt(INDEX_WRITER_RPC_RETRIES_NUMBER,
+                DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER));
+        clonedConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, env.getConfiguration()
+            .getInt(INDEX_WRITER_RPC_PAUSE, DEFAULT_INDEX_WRITER_RPC_PAUSE));
         DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(clonedConfig, env);
         // setup the actual index writer
-        // setup the actual index writer
         // For transactional tables, we keep the index active upon a write failure
         // since we have the all versus none behavior for transactions.
         this.writer = new IndexWriter(new LeaveIndexActiveFailurePolicy(), indexWriterEnv, serverName + "-tx-index-writer");
@@ -161,6 +175,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
             
         };
     }
+    
     @Override
     public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
             MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
@@ -171,8 +186,10 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
             return;
         }
 
+        BatchMutateContext context = new BatchMutateContext();
+        setBatchMutateContext(c, context);
+        
         Map<String,byte[]> updateAttributes = m.getAttributesMap();
-        String tableName = c.getEnvironment().getRegion().getTableDesc().getNameAsString();
         PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(c.getEnvironment(),updateAttributes);
         byte[] txRollbackAttribute = m.getAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY);
         Collection<Pair<Mutation, byte[]>> indexUpdates = null;
@@ -184,27 +201,10 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
             }
 
             // get the index updates for all elements in this batch
-            indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp), txRollbackAttribute);
-            Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = indexUpdates.iterator();
-            List<Pair<Mutation, byte[]>> localIndexUpdates = new ArrayList<Pair<Mutation, byte[]>>(indexUpdates.size());
-            while(indexUpdatesItr.hasNext()) {
-                Pair<Mutation, byte[]> next = indexUpdatesItr.next();
-                if(tableName.equals(Bytes.toString(next.getSecond()))) {
-                    localIndexUpdates.add(next);
-                    indexUpdatesItr.remove();
-                }
-            }
-            if(!localIndexUpdates.isEmpty()) {
-                byte[] bs = indexMetaData.getAttributes().get(PhoenixIndexCodec.INDEX_UUID); 
-                localUpdates.put(Bytes.toLong(bs), localIndexUpdates);
-            }
-            current.addTimelineAnnotation("Built index updates, doing preStep");
-            TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
+            context.indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp), txRollbackAttribute);
 
-            // no index updates, so we are done
-            if (!indexUpdates.isEmpty()) {
-                this.writer.write(indexUpdates, false);
-            }
+            current.addTimelineAnnotation("Built index updates, doing preStep");
+            TracingUtils.addAnnotation(current, "index update count", context.indexUpdates.size());
         } catch (Throwable t) {
             String msg = "Failed to update index with entries:" + indexUpdates;
             LOG.error(msg, t);
@@ -213,43 +213,44 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
     }
 
     @Override
-    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
-            Durability durability) throws IOException {
-        Map<String,byte[]> updateAttributes = put.getAttributesMap();
-        PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(e.getEnvironment(),updateAttributes);
-        byte[] bs = indexMetaData.getAttributes().get(PhoenixIndexCodec.INDEX_UUID);
-        if (bs == null || localUpdates.get(Bytes.toLong(bs)) == null) {
-            super.prePut(e, put, edit, durability);
-        } else {
-            Collection<Pair<Mutation, byte[]>> localIndexUpdates = localUpdates.remove(Bytes.toLong(bs));
-            try{
-                this.writer.write(localIndexUpdates, true);
-            } catch (Throwable t) {
-                String msg = "Failed to update index with entries:" + localIndexUpdates;
-                LOG.error(msg, t);
-                ServerUtil.throwIOException(msg, t);
-            }
+    public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c,
+        MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException {
+        BatchMutateContext context = getBatchMutateContext(c);
+        if (context == null || context.indexUpdates == null) {
+            return;
         }
-    }
+        // get the current span, or just use a null-span to avoid a bunch of if statements
+        try (TraceScope scope = Trace.startSpan("Starting to write index updates")) {
+            Span current = scope.getSpan();
+            if (current == null) {
+                current = NullSpan.INSTANCE;
+            }
 
-    @Override
-    public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete,
-            WALEdit edit, Durability durability) throws IOException {
-        Map<String,byte[]> updateAttributes = delete.getAttributesMap();
-        PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(e.getEnvironment(),updateAttributes);
-        byte[] bs = indexMetaData.getAttributes().get(PhoenixIndexCodec.INDEX_UUID);
-        if (bs == null || localUpdates.get(Bytes.toLong(bs)) == null) {
-            super.postDelete(e, delete, edit, durability);
-        } else {
-            Collection<Pair<Mutation, byte[]>> localIndexUpdates = localUpdates.remove(Bytes.toLong(bs));
-            try{
-                this.writer.write(localIndexUpdates, true);
-            } catch (Throwable t) {
-                String msg = "Failed to update index with entries:" + localIndexUpdates;
-                LOG.error(msg, t);
-                ServerUtil.throwIOException(msg, t);
+            if (success) { // if miniBatchOp was successfully written, write index updates
+                if (!context.indexUpdates.isEmpty()) {
+                    this.writer.write(context.indexUpdates, true);
+                }
+                current.addTimelineAnnotation("Wrote index updates");
             }
-        }
+        } catch (Throwable t) {
+            String msg = "Failed to write index updates:" + context.indexUpdates;
+            LOG.error(msg, t);
+            ServerUtil.throwIOException(msg, t);
+         } finally {
+             removeBatchMutateContext(c);
+         }
+    }
+
+    private void setBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) {
+        this.batchMutateContext.set(context);
+    }
+    
+    private BatchMutateContext getBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
+        return this.batchMutateContext.get();
+    }
+    
+    private void removeBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
+        this.batchMutateContext.remove();
     }
 
     private static void addMutation(Map<ImmutableBytesPtr, MultiMutation> mutations, ImmutableBytesPtr row, Mutation m) {


[2/3] phoenix git commit: PHOENIX-4233 IndexScrutiny test tool does not work for salted and shared index tables

Posted by ja...@apache.org.
PHOENIX-4233 IndexScrutiny test tool does not work for salted and shared index tables


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/41da53fb
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/41da53fb
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/41da53fb

Branch: refs/heads/4.x-HBase-1.1
Commit: 41da53fbdbcd2fe49d8b6189afe1d89055a27393
Parents: 0403840
Author: James Taylor <jt...@salesforce.com>
Authored: Mon Sep 25 22:33:28 2017 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Tue Sep 26 12:59:55 2017 -0700

----------------------------------------------------------------------
 .../apache/phoenix/util/IndexScrutinyIT.java    |  4 +-
 .../org/apache/phoenix/util/IndexScrutiny.java  | 47 ++++++++++++++------
 2 files changed, 36 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/41da53fb/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
index a5ec83f..3277e32 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
@@ -35,7 +35,7 @@ public class IndexScrutinyIT extends ParallelStatsDisabledIT {
         String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
         String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
         try (Connection conn = DriverManager.getConnection(getUrl())) {
-            conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
+            conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true, SALT_BUCKETS=2");
             conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)");
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb')");
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')");
@@ -61,7 +61,7 @@ public class IndexScrutinyIT extends ParallelStatsDisabledIT {
         String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
-            conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v) INCLUDE (v2)");
+            conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + fullTableName + " (v) INCLUDE (v2)");
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb','0')");
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','1')");
             conn.commit();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/41da53fb/phoenix-core/src/test/java/org/apache/phoenix/util/IndexScrutiny.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/IndexScrutiny.java b/phoenix-core/src/test/java/org/apache/phoenix/util/IndexScrutiny.java
index c78658d..380e718 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/IndexScrutiny.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/IndexScrutiny.java
@@ -25,6 +25,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.util.List;
 
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.schema.PColumn;
@@ -39,20 +40,39 @@ public class IndexScrutiny {
     public static long scrutinizeIndex(Connection conn, String fullTableName, String fullIndexName) throws SQLException {
         PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
         PTable ptable = pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName));
+        int tableColumnOffset = 0;
+        List<PColumn> tableColumns = ptable.getColumns();
+        List<PColumn> tablePKColumns = ptable.getPKColumns();
+        if (ptable.getBucketNum() != null) {
+            tableColumnOffset = 1;
+            tableColumns = tableColumns.subList(tableColumnOffset, tableColumns.size());
+            tablePKColumns = tablePKColumns.subList(tableColumnOffset, tablePKColumns.size());
+        }
         PTable pindex = pconn.getTable(new PTableKey(pconn.getTenantId(), fullIndexName));
+        List<PColumn> indexColumns = pindex.getColumns();
+        int indexColumnOffset = 0;
+        if (pindex.getBucketNum() != null) {
+            indexColumnOffset = 1;
+        }
+        if (pindex.getViewIndexId() != null) {
+            indexColumnOffset++;
+        }
+        if (indexColumnOffset > 0) {
+            indexColumns = indexColumns.subList(indexColumnOffset, indexColumns.size());
+        }
         StringBuilder indexQueryBuf = new StringBuilder("SELECT ");
-        for (PColumn dcol : ptable.getPKColumns()) {
+        for (PColumn dcol : tablePKColumns) {
             indexQueryBuf.append("CAST(\"" + IndexUtil.getIndexColumnName(dcol) + "\" AS " + dcol.getDataType().getSqlTypeName() + ")");
             indexQueryBuf.append(",");
         }
-        for (PColumn icol : pindex.getColumns()) {
+        for (PColumn icol :indexColumns) {
             PColumn dcol = IndexUtil.getDataColumn(ptable, icol.getName().getString());
             if (SchemaUtil.isPKColumn(icol) && !SchemaUtil.isPKColumn(dcol)) {
                 indexQueryBuf.append("CAST (\"" + icol.getName().getString() + "\" AS " + dcol.getDataType().getSqlTypeName() + ")");
                 indexQueryBuf.append(",");
             }
         }
-        for (PColumn icol : pindex.getColumns()) {
+        for (PColumn icol : indexColumns) {
             if (!SchemaUtil.isPKColumn(icol)) {
                 PColumn dcol = IndexUtil.getDataColumn(ptable, icol.getName().getString());
                 indexQueryBuf.append("CAST (\"" + icol.getName().getString() + "\" AS " + dcol.getDataType().getSqlTypeName() + ")");
@@ -63,11 +83,11 @@ public class IndexScrutiny {
         indexQueryBuf.append("\nFROM " + fullIndexName);
         
         StringBuilder tableQueryBuf = new StringBuilder("SELECT ");
-        for (PColumn dcol : ptable.getPKColumns()) {
+        for (PColumn dcol : tablePKColumns) {
             tableQueryBuf.append("\"" + dcol.getName().getString() + "\"");
             tableQueryBuf.append(",");
         }
-        for (PColumn icol : pindex.getColumns()) {
+        for (PColumn icol : indexColumns) {
             PColumn dcol = IndexUtil.getDataColumn(ptable, icol.getName().getString());
             if (SchemaUtil.isPKColumn(icol) && !SchemaUtil.isPKColumn(dcol)) {
                 if (dcol.getFamilyName() != null) {
@@ -78,7 +98,7 @@ public class IndexScrutiny {
                 tableQueryBuf.append(",");
             }
         }
-        for (PColumn icol : pindex.getColumns()) {
+        for (PColumn icol : indexColumns) {
             if (!SchemaUtil.isPKColumn(icol)) {
                 PColumn dcol = IndexUtil.getDataColumn(ptable, icol.getName().getString());
                 if (dcol.getFamilyName() != null) {
@@ -91,13 +111,13 @@ public class IndexScrutiny {
         }
         tableQueryBuf.setLength(tableQueryBuf.length()-1);
         tableQueryBuf.append("\nFROM " + fullTableName + "\nWHERE (");
-        for (PColumn dcol : ptable.getPKColumns()) {
+        for (PColumn dcol : tablePKColumns) {
             tableQueryBuf.append("\"" + dcol.getName().getString() + "\"");
             tableQueryBuf.append(",");
         }
         tableQueryBuf.setLength(tableQueryBuf.length()-1);
         tableQueryBuf.append(") = ((");
-        for (int i = 0; i < ptable.getPKColumns().size(); i++) {
+        for (int i = 0; i < tablePKColumns.size(); i++) {
             tableQueryBuf.append("?");
             tableQueryBuf.append(",");
         }
@@ -114,11 +134,12 @@ public class IndexScrutiny {
         while (irs.next()) {
             icount++;
             StringBuilder pkBuf = new StringBuilder("(");
-            for (int i = 0; i < ptable.getPKColumns().size(); i++) {
-                PColumn dcol = ptable.getPKColumns().get(i);
-                Object pkVal = irs.getObject(i+1);
-                PDataType pkType = PDataType.fromTypeId(irsmd.getColumnType(i + 1));
-                istmt.setObject(i+1, pkVal, dcol.getDataType().getSqlType());
+            for (int i = 0; i < tablePKColumns.size(); i++) {
+                PColumn dcol = tablePKColumns.get(i);
+                int offset = i+1;
+                Object pkVal = irs.getObject(offset);
+                PDataType pkType = PDataType.fromTypeId(irsmd.getColumnType(offset));
+                istmt.setObject(offset, pkVal, dcol.getDataType().getSqlType());
                 pkBuf.append(pkType.toStringLiteral(pkVal));
                 pkBuf.append(",");
             }


[3/3] phoenix git commit: PHOENIX-3815 Only disable indexes on which write failures occurred (Vincent Poon)

Posted by ja...@apache.org.
PHOENIX-3815 Only disable indexes on which write failures occurred (Vincent Poon)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8565f365
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8565f365
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8565f365

Branch: refs/heads/4.x-HBase-1.1
Commit: 8565f3653e6d31a503b4e6d105b1437a04d2e2c2
Parents: 41da53f
Author: James Taylor <jt...@salesforce.com>
Authored: Mon Sep 25 22:37:01 2017 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Tue Sep 26 13:00:16 2017 -0700

----------------------------------------------------------------------
 .../end2end/index/MutableIndexFailureIT.java    |  38 +--
 .../phoenix/hbase/index/write/IndexWriter.java  |   8 +-
 .../hbase/index/write/RecoveryIndexWriter.java  |   1 -
 .../TrackingParallelWriterIndexCommitter.java   | 243 +++++++++++++++++++
 .../recovery/StoreFailuresInCachePolicy.java    |   1 +
 .../TrackingParallelWriterIndexCommitter.java   | 236 ------------------
 .../index/PhoenixIndexFailurePolicy.java        |   9 +
 .../index/PhoenixTransactionalIndexer.java      |   7 +-
 .../hbase/index/write/TestIndexWriter.java      |  89 +------
 .../index/write/TestParalleIndexWriter.java     |   4 +-
 .../write/TestParalleWriterIndexCommitter.java  |   4 +-
 11 files changed, 294 insertions(+), 346 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/8565f365/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index c280536..2948831 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -60,6 +60,7 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.util.IndexScrutiny;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
@@ -228,8 +229,8 @@ public class MutableIndexFailureIT extends BaseTest {
     @Test
     public void testIndexWriteFailure() throws Exception {
         String secondIndexName = "B_" + FailingRegionObserver.FAIL_INDEX_NAME;
-//        String thirdIndexName = "C_" + INDEX_NAME;
-//        String thirdFullIndexName = SchemaUtil.getTableName(schema, thirdIndexName);
+        String thirdIndexName = "C_IDX";
+        String thirdFullIndexName = SchemaUtil.getTableName(schema, thirdIndexName);
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         props.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, String.valueOf(isNamespaceMapped));
         try (Connection conn = driver.connect(url, props)) {
@@ -252,8 +253,8 @@ public class MutableIndexFailureIT extends BaseTest {
             // check the drop index.
             conn.createStatement().execute(
                     "CREATE "  + (!localIndex ? "LOCAL " : "") + " INDEX " + secondIndexName + " ON " + fullTableName + " (v2) INCLUDE (v1)");
-//            conn.createStatement().execute(
-//                    "CREATE " + (localIndex ? "LOCAL " : "") + " INDEX " + thirdIndexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
+            conn.createStatement().execute(
+                    "CREATE " + (localIndex ? "LOCAL " : "") + " INDEX " + thirdIndexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
 
             query = "SELECT * FROM " + fullIndexName;
             rs = conn.createStatement().executeQuery(query);
@@ -268,9 +269,9 @@ public class MutableIndexFailureIT extends BaseTest {
             assertTrue(rs.next());
             assertEquals(secondIndexName, rs.getString(3));
             assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
-//            assertTrue(rs.next());
-//            assertEquals(thirdIndexName, rs.getString(3));
-//            assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
+            assertTrue(rs.next());
+            assertEquals(thirdIndexName, rs.getString(3));
+            assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
             initializeTable(conn, fullTableName);
             
             query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
@@ -303,6 +304,10 @@ public class MutableIndexFailureIT extends BaseTest {
                 assertTrue(PIndexState.ACTIVE.toString().equals(indexState) || PIndexState.PENDING_ACTIVE.toString().equals(indexState));
             } else {
                 assertTrue(PIndexState.DISABLE.toString().equals(indexState) || PIndexState.INACTIVE.toString().equals(indexState));
+                // non-failing index should remain active
+                ResultSet thirdRs = conn.createStatement().executeQuery(getSysCatQuery(thirdIndexName));
+                assertTrue(thirdRs.next());
+                assertEquals(PIndexState.ACTIVE.getSerializedValue(), thirdRs.getString(1));
             }
             assertFalse(rs.next());
 
@@ -332,8 +337,7 @@ public class MutableIndexFailureIT extends BaseTest {
                 assertEquals("d", rs.getString(2));
                 assertFalse(rs.next());
             }
-            // Comment back in when PHOENIX-3815 is fixed
-//            validateDataWithIndex(conn, fullTableName, thirdFullIndexName, false);
+            IndexScrutiny.scrutinizeIndex(conn, fullTableName, thirdFullIndexName);
 
             if (!failRebuildTask) {
                 // re-enable index table
@@ -359,10 +363,7 @@ public class MutableIndexFailureIT extends BaseTest {
                 checkStateAfterRebuild(conn, fullIndexName, PIndexState.DISABLE);
                 // verify that the index was marked as disabled and the index disable
                 // timestamp set to 0
-                String q =
-                        "SELECT INDEX_STATE, INDEX_DISABLE_TIMESTAMP FROM SYSTEM.CATALOG WHERE TABLE_SCHEM = '"
-                                + schema + "' AND TABLE_NAME = '" + indexName + "'"
-                                + " AND COLUMN_NAME IS NULL AND COLUMN_FAMILY IS NULL";
+                String q = getSysCatQuery(indexName);
                 try (ResultSet r = conn.createStatement().executeQuery(q)) {
                     assertTrue(r.next());
                     assertEquals(PIndexState.DISABLE.getSerializedValue(), r.getString(1));
@@ -376,6 +377,15 @@ public class MutableIndexFailureIT extends BaseTest {
         }
     }
 
+    private String getSysCatQuery(String iName) {
+        String q =
+                "SELECT INDEX_STATE, INDEX_DISABLE_TIMESTAMP FROM SYSTEM.CATALOG WHERE TABLE_SCHEM = '"
+                        + schema + "' AND TABLE_NAME = '" + iName + "'"
+                        + " AND COLUMN_NAME IS NULL AND COLUMN_FAMILY IS NULL";
+        return q;
+    }
+
+
     private void checkStateAfterRebuild(Connection conn, String fullIndexName, PIndexState expectedIndexState) throws InterruptedException, SQLException {
         if (!transactional) {
             assertTrue(TestUtil.checkIndexState(conn,fullIndexName, expectedIndexState, 0l));
@@ -543,4 +553,4 @@ public class MutableIndexFailureIT extends BaseTest {
          }
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8565f365/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
index a037e92..6b57025 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
@@ -47,7 +47,7 @@ import com.google.common.collect.Multimap;
 public class IndexWriter implements Stoppable {
 
   private static final Log LOG = LogFactory.getLog(IndexWriter.class);
-  private static final String INDEX_COMMITTER_CONF_KEY = "index.writer.commiter.class";
+  public static final String INDEX_COMMITTER_CONF_KEY = "index.writer.commiter.class";
   public static final String INDEX_FAILURE_POLICY_CONF_KEY = "index.writer.failurepolicy.class";
   private AtomicBoolean stopped = new AtomicBoolean(false);
   private IndexCommitter writer;
@@ -66,10 +66,14 @@ public class IndexWriter implements Stoppable {
     }
 
   public static IndexCommitter getCommitter(RegionCoprocessorEnvironment env) throws IOException {
+      return getCommitter(env,TrackingParallelWriterIndexCommitter.class);
+  }
+  
+  public static IndexCommitter getCommitter(RegionCoprocessorEnvironment env, Class<? extends IndexCommitter> defaultClass) throws IOException {
     Configuration conf = env.getConfiguration();
     try {
       IndexCommitter committer =
-          conf.getClass(INDEX_COMMITTER_CONF_KEY, ParallelWriterIndexCommitter.class,
+          conf.getClass(INDEX_COMMITTER_CONF_KEY, defaultClass,
             IndexCommitter.class).newInstance();
       return committer;
     } catch (InstantiationException e) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8565f365/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
index be542bb..e340784 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException;
 import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.hbase.index.write.recovery.TrackingParallelWriterIndexCommitter;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8565f365/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
new file mode 100644
index 0000000..0052f8c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.write;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.hbase.index.CapturingAbortable;
+import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException;
+import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
+import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
+import org.apache.phoenix.hbase.index.parallel.Task;
+import org.apache.phoenix.hbase.index.parallel.TaskBatch;
+import org.apache.phoenix.hbase.index.parallel.TaskRunner;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
+import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner;
+import org.apache.phoenix.hbase.index.table.HTableFactory;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.util.IndexUtil;
+
+import com.google.common.collect.Multimap;
+
+/**
+ * Like the {@link ParallelWriterIndexCommitter}, but blocks until all writes have attempted to allow the caller to
+ * retrieve the failed and succeeded index updates. Therefore, this class will be a lot slower, in the face of failures,
+ * when compared to the {@link ParallelWriterIndexCommitter} (though as fast for writes), so it should be used only when
+ * you need to at least attempt all writes and know their result; for instance, this is fine for doing WAL recovery -
+ * it's not a performance intensive situation and we want to limit the the edits we need to retry.
+ * <p>
+ * On failure to {@link #write(Multimap)}, we return a {@link MultiIndexWriteFailureException} that contains the list of
+ * {@link HTableInterfaceReference} that didn't complete successfully.
+ * <p>
+ * Failures to write to the index can happen several different ways:
+ * <ol>
+ * <li><tt>this</tt> is {@link #stop(String) stopped} or aborted (via the passed {@link Abortable}. This causing any
+ * pending tasks to fail whatever they are doing as fast as possible. Any writes that have not begun are not even
+ * attempted and marked as failures.</li>
+ * <li>A batch write fails. This is the generic HBase write failure - it may occur because the index table is not
+ * available, .META. or -ROOT- is unavailable, or any other (of many) possible HBase exceptions.</li>
+ * </ol>
+ * Regardless of how the write fails, we still wait for all writes to complete before passing the failure back to the
+ * client.
+ */
+public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
+    private static final Log LOG = LogFactory.getLog(TrackingParallelWriterIndexCommitter.class);
+
+    public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.writer.threads.max";
+    private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
+    private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY = "index.writer.threads.keepalivetime";
+
+    private TaskRunner pool;
+    private HTableFactory factory;
+    private CapturingAbortable abortable;
+    private Stoppable stopped;
+    private RegionCoprocessorEnvironment env;
+    private KeyValueBuilder kvBuilder;
+
+    // for testing
+    public TrackingParallelWriterIndexCommitter(String hbaseVersion) {
+        kvBuilder = KeyValueBuilder.get(hbaseVersion);
+    }
+
+    public TrackingParallelWriterIndexCommitter() {
+    }
+
+    @Override
+    public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
+        this.env = env;
+        Configuration conf = env.getConfiguration();
+        setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
+                ThreadPoolManager.getExecutor(
+                        new ThreadPoolBuilder(name, conf).setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
+                                DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout(
+                                INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), env.getRegionServerServices(), parent, env);
+        this.kvBuilder = KeyValueBuilder.get(env.getHBaseVersion());
+    }
+
+    /**
+     * Setup <tt>this</tt>.
+     * <p>
+     * Exposed for TESTING
+     */
+    void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop,
+            RegionCoprocessorEnvironment env) {
+        this.pool = new WaitForCompletionTaskRunner(pool);
+        this.factory = factory;
+        this.abortable = new CapturingAbortable(abortable);
+        this.stopped = stop;
+    }
+
+    @Override
+    public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, final boolean allowLocalUpdates) throws MultiIndexWriteFailureException {
+        Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
+        TaskBatch<Boolean> tasks = new TaskBatch<Boolean>(entries.size());
+        List<HTableInterfaceReference> tables = new ArrayList<HTableInterfaceReference>(entries.size());
+        for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
+            // get the mutations for each table. We leak the implementation here a little bit to save
+            // doing a complete copy over of all the index update for each table.
+            final List<Mutation> mutations = kvBuilder.cloneIfNecessary((List<Mutation>)entry.getValue());
+            // track each reference so we can get at it easily later, when determing failures
+            final HTableInterfaceReference tableReference = entry.getKey();
+            final RegionCoprocessorEnvironment env = this.env;
+			if (env != null
+					&& !allowLocalUpdates
+					&& tableReference.getTableName().equals(
+							env.getRegion().getTableDesc().getNameAsString())) {
+				continue;
+			}
+            tables.add(tableReference);
+
+            /*
+             * Write a batch of index updates to an index table. This operation stops (is cancelable) via two
+             * mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the running thread.
+             * The former will only work if we are not in the midst of writing the current batch to the table, though we
+             * do check these status variables before starting and before writing the batch. The latter usage,
+             * interrupting the thread, will work in the previous situations as was at some points while writing the
+             * batch, depending on the underlying writer implementation (HTableInterface#batch is blocking, but doesn't
+             * elaborate when is supports an interrupt).
+             */
+            tasks.add(new Task<Boolean>() {
+
+                /**
+                 * Do the actual write to the primary table.
+                 */
+                @SuppressWarnings("deprecation")
+                @Override
+                public Boolean call() throws Exception {
+                    HTableInterface table = null;
+                    try {
+                        // this may have been queued, but there was an abort/stop so we try to early exit
+                        throwFailureIfDone();
+                        if (allowLocalUpdates && env!=null && tableReference.getTableName().equals(
+                            env.getRegion().getTableDesc().getNameAsString())) {
+                            try {
+                                throwFailureIfDone();
+                                IndexUtil.writeLocalUpdates(env.getRegion(), mutations, true);
+                                return Boolean.TRUE;
+                            } catch (IOException ignord) {
+                                // when it's failed we fall back to the standard & slow way
+                                if (LOG.isTraceEnabled()) {
+                                    LOG.trace("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error="
+                                            + ignord);
+                                }
+                            }
+                        }
+
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("Writing index update:" + mutations + " to table: " + tableReference);
+                        }
+
+                        table = factory.getTable(tableReference.get());
+                        throwFailureIfDone();
+                        table.batch(mutations);
+                    } catch (InterruptedException e) {
+                        // reset the interrupt status on the thread
+                        Thread.currentThread().interrupt();
+                        throw e;
+                    } catch (Exception e) {
+                        throw e;
+                    } finally {
+                        if (table != null) {
+                            table.close();
+                        }
+                    }
+                    return Boolean.TRUE;
+                }
+
+                private void throwFailureIfDone() throws SingleIndexWriteFailureException {
+                    if (stopped.isStopped() || abortable.isAborted() || Thread.currentThread().isInterrupted()) { throw new SingleIndexWriteFailureException(
+                            "Pool closed, not attempting to write to the index!", null); }
+
+                }
+            });
+        }
+
+        List<Boolean> results = null;
+        try {
+            LOG.debug("Waiting on index update tasks to complete...");
+            results = this.pool.submitUninterruptible(tasks);
+        } catch (ExecutionException e) {
+            throw new RuntimeException("Should not fail on the results while using a WaitForCompletionTaskRunner", e);
+        } catch (EarlyExitFailure e) {
+            throw new RuntimeException("Stopped while waiting for batch, quiting!", e);
+        }
+
+        // track the failures. We only ever access this on return from our calls, so no extra
+        // synchronization is needed. We could update all the failures as we find them, but that add a
+        // lot of locking overhead, and just doing the copy later is about as efficient.
+        List<HTableInterfaceReference> failures = new ArrayList<HTableInterfaceReference>();
+        int index = 0;
+        for (Boolean result : results) {
+            // there was a failure
+            if (result == null) {
+                // we know which table failed by the index of the result
+                failures.add(tables.get(index));
+            }
+            index++;
+        }
+
+        // if any of the tasks failed, then we need to propagate the failure
+        if (failures.size() > 0) {
+            // make the list unmodifiable to avoid any more synchronization concerns
+            throw new MultiIndexWriteFailureException(Collections.unmodifiableList(failures));
+        }
+        return;
+    }
+
+    @Override
+    public void stop(String why) {
+        LOG.info("Shutting down " + this.getClass().getSimpleName());
+        this.pool.stop(why);
+        this.factory.shutdown();
+    }
+
+    @Override
+    public boolean isStopped() {
+        return this.stopped.isStopped();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8565f365/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/StoreFailuresInCachePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/StoreFailuresInCachePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/StoreFailuresInCachePolicy.java
index 189f970..e28a0bd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/StoreFailuresInCachePolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/StoreFailuresInCachePolicy.java
@@ -30,6 +30,7 @@ import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException;
 import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 import org.apache.phoenix.hbase.index.write.IndexFailurePolicy;
 import org.apache.phoenix.hbase.index.write.KillServerOnFailurePolicy;
+import org.apache.phoenix.hbase.index.write.TrackingParallelWriterIndexCommitter;
 
 /**
  * Tracks any failed writes in The {@link PerRegionIndexWriteCache}, given a

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8565f365/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
deleted file mode 100644
index 2b15a5e..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
- * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
- * governing permissions and limitations under the License.
- */
-package org.apache.phoenix.hbase.index.write.recovery;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.phoenix.hbase.index.CapturingAbortable;
-import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException;
-import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
-import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
-import org.apache.phoenix.hbase.index.parallel.Task;
-import org.apache.phoenix.hbase.index.parallel.TaskBatch;
-import org.apache.phoenix.hbase.index.parallel.TaskRunner;
-import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
-import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
-import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner;
-import org.apache.phoenix.hbase.index.table.HTableFactory;
-import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
-import org.apache.phoenix.hbase.index.write.IndexCommitter;
-import org.apache.phoenix.hbase.index.write.IndexWriter;
-import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
-import org.apache.phoenix.hbase.index.write.ParallelWriterIndexCommitter;
-import org.apache.phoenix.util.IndexUtil;
-
-import com.google.common.collect.Multimap;
-
-/**
- * Like the {@link ParallelWriterIndexCommitter}, but blocks until all writes have attempted to allow the caller to
- * retrieve the failed and succeeded index updates. Therefore, this class will be a lot slower, in the face of failures,
- * when compared to the {@link ParallelWriterIndexCommitter} (though as fast for writes), so it should be used only when
- * you need to at least attempt all writes and know their result; for instance, this is fine for doing WAL recovery -
- * it's not a performance intensive situation and we want to limit the the edits we need to retry.
- * <p>
- * On failure to {@link #write(Multimap)}, we return a {@link MultiIndexWriteFailureException} that contains the list of
- * {@link HTableInterfaceReference} that didn't complete successfully.
- * <p>
- * Failures to write to the index can happen several different ways:
- * <ol>
- * <li><tt>this</tt> is {@link #stop(String) stopped} or aborted (via the passed {@link Abortable}. This causing any
- * pending tasks to fail whatever they are doing as fast as possible. Any writes that have not begun are not even
- * attempted and marked as failures.</li>
- * <li>A batch write fails. This is the generic HBase write failure - it may occur because the index table is not
- * available, .META. or -ROOT- is unavailable, or any other (of many) possible HBase exceptions.</li>
- * </ol>
- * Regardless of how the write fails, we still wait for all writes to complete before passing the failure back to the
- * client.
- */
-public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
-    private static final Log LOG = LogFactory.getLog(TrackingParallelWriterIndexCommitter.class);
-
-    public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.trackingwriter.threads.max";
-    private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
-    private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY = "index.trackingwriter.threads.keepalivetime";
-
-    private TaskRunner pool;
-    private HTableFactory factory;
-    private CapturingAbortable abortable;
-    private Stoppable stopped;
-    private RegionCoprocessorEnvironment env;
-
-    @Override
-    public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
-        this.env = env;
-        Configuration conf = env.getConfiguration();
-        setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
-                ThreadPoolManager.getExecutor(
-                        new ThreadPoolBuilder(name, conf).setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
-                                DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout(
-                                INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), env.getRegionServerServices(), parent, env);
-    }
-
-    /**
-     * Setup <tt>this</tt>.
-     * <p>
-     * Exposed for TESTING
-     */
-    void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop,
-            RegionCoprocessorEnvironment env) {
-        this.pool = new WaitForCompletionTaskRunner(pool);
-        this.factory = factory;
-        this.abortable = new CapturingAbortable(abortable);
-        this.stopped = stop;
-    }
-
-    @Override
-    public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, final boolean allowLocalUpdates) throws MultiIndexWriteFailureException {
-        Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
-        TaskBatch<Boolean> tasks = new TaskBatch<Boolean>(entries.size());
-        List<HTableInterfaceReference> tables = new ArrayList<HTableInterfaceReference>(entries.size());
-        for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
-            // get the mutations for each table. We leak the implementation here a little bit to save
-            // doing a complete copy over of all the index update for each table.
-            final List<Mutation> mutations = (List<Mutation>)entry.getValue();
-            // track each reference so we can get at it easily later, when determing failures
-            final HTableInterfaceReference tableReference = entry.getKey();
-            final RegionCoprocessorEnvironment env = this.env;
-			if (env != null
-					&& !allowLocalUpdates
-					&& tableReference.getTableName().equals(
-							env.getRegion().getTableDesc().getNameAsString())) {
-				continue;
-			}
-            tables.add(tableReference);
-
-            /*
-             * Write a batch of index updates to an index table. This operation stops (is cancelable) via two
-             * mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the running thread.
-             * The former will only work if we are not in the midst of writing the current batch to the table, though we
-             * do check these status variables before starting and before writing the batch. The latter usage,
-             * interrupting the thread, will work in the previous situations as was at some points while writing the
-             * batch, depending on the underlying writer implementation (HTableInterface#batch is blocking, but doesn't
-             * elaborate when is supports an interrupt).
-             */
-            tasks.add(new Task<Boolean>() {
-
-                /**
-                 * Do the actual write to the primary table.
-                 */
-                @SuppressWarnings("deprecation")
-                @Override
-                public Boolean call() throws Exception {
-                    HTableInterface table = null;
-                    try {
-                        // this may have been queued, but there was an abort/stop so we try to early exit
-                        throwFailureIfDone();
-                        if (allowLocalUpdates && env!=null && tableReference.getTableName().equals(
-                            env.getRegion().getTableDesc().getNameAsString())) {
-                            try {
-                                throwFailureIfDone();
-                                IndexUtil.writeLocalUpdates(env.getRegion(), mutations, true);
-                                return Boolean.TRUE;
-                            } catch (IOException ignord) {
-                                // when it's failed we fall back to the standard & slow way
-                                if (LOG.isTraceEnabled()) {
-                                    LOG.trace("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error="
-                                            + ignord);
-                                }
-                            }
-                        }
-
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("Writing index update:" + mutations + " to table: " + tableReference);
-                        }
-
-                        table = factory.getTable(tableReference.get());
-                        throwFailureIfDone();
-                        table.batch(mutations);
-                    } catch (InterruptedException e) {
-                        // reset the interrupt status on the thread
-                        Thread.currentThread().interrupt();
-                        throw e;
-                    } catch (Exception e) {
-                        throw e;
-                    } finally {
-                        if (table != null) {
-                            table.close();
-                        }
-                    }
-                    return Boolean.TRUE;
-                }
-
-                private void throwFailureIfDone() throws SingleIndexWriteFailureException {
-                    if (stopped.isStopped() || abortable.isAborted() || Thread.currentThread().isInterrupted()) { throw new SingleIndexWriteFailureException(
-                            "Pool closed, not attempting to write to the index!", null); }
-
-                }
-            });
-        }
-
-        List<Boolean> results = null;
-        try {
-            LOG.debug("Waiting on index update tasks to complete...");
-            results = this.pool.submitUninterruptible(tasks);
-        } catch (ExecutionException e) {
-            throw new RuntimeException("Should not fail on the results while using a WaitForCompletionTaskRunner", e);
-        } catch (EarlyExitFailure e) {
-            throw new RuntimeException("Stopped while waiting for batch, quiting!", e);
-        }
-
-        // track the failures. We only ever access this on return from our calls, so no extra
-        // synchronization is needed. We could update all the failures as we find them, but that add a
-        // lot of locking overhead, and just doing the copy later is about as efficient.
-        List<HTableInterfaceReference> failures = new ArrayList<HTableInterfaceReference>();
-        int index = 0;
-        for (Boolean result : results) {
-            // there was a failure
-            if (result == null) {
-                // we know which table failed by the index of the result
-                failures.add(tables.get(index));
-            }
-            index++;
-        }
-
-        // if any of the tasks failed, then we need to propagate the failure
-        if (failures.size() > 0) {
-            // make the list unmodifiable to avoid any more synchronization concerns
-            throw new MultiIndexWriteFailureException(Collections.unmodifiableList(failures));
-        }
-        return;
-    }
-
-    @Override
-    public void stop(String why) {
-        LOG.info("Shutting down " + this.getClass().getSimpleName());
-        this.pool.stop(why);
-        this.factory.shutdown();
-    }
-
-    @Override
-    public boolean isStopped() {
-        return this.stopped.isStopped();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8565f365/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index ee3b380..0fc138f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
+import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException;
 import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 import org.apache.phoenix.hbase.index.write.DelegateIndexFailurePolicy;
 import org.apache.phoenix.hbase.index.write.KillServerOnFailurePolicy;
@@ -166,7 +167,15 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy {
         // start by looking at all the tables to which we attempted to write
         long timestamp = 0;
         boolean leaveIndexActive = blockDataTableWritesOnFailure || !disableIndexOnFailure;
+        // if using TrackingParallelWriter, we know which indexes failed and only disable those
+        Set<HTableInterfaceReference> failedTables = cause instanceof MultiIndexWriteFailureException 
+                ? new HashSet<HTableInterfaceReference>(((MultiIndexWriteFailureException)cause).getFailedTables())
+                : Collections.<HTableInterfaceReference>emptySet();
+        
         for (HTableInterfaceReference ref : refs) {
+            if (failedTables.size() > 0 && !failedTables.contains(ref)) {
+                continue; // leave index active if its writes succeeded
+            }
             long minTimeStamp = 0;
 
             // get the minimum timestamp across all the mutations we attempted on that table

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8565f365/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 969378d..bc53b6b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -76,6 +76,7 @@ import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
 import org.apache.phoenix.hbase.index.write.LeaveIndexActiveFailurePolicy;
+import org.apache.phoenix.hbase.index.write.ParallelWriterIndexCommitter;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.trace.TracingUtils;
@@ -142,8 +143,10 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
         DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(clonedConfig, env);
         // setup the actual index writer
         // For transactional tables, we keep the index active upon a write failure
-        // since we have the all versus none behavior for transactions.
-        this.writer = new IndexWriter(new LeaveIndexActiveFailurePolicy(), indexWriterEnv, serverName + "-tx-index-writer");
+        // since we have the all versus none behavior for transactions. Also, we
+        // fail on any write exception since this will end up failing the transaction.
+        this.writer = new IndexWriter(IndexWriter.getCommitter(indexWriterEnv, ParallelWriterIndexCommitter.class),
+                new LeaveIndexActiveFailurePolicy(), indexWriterEnv, serverName + "-tx-index-writer");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8565f365/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
index 012f08e..b0e3780 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
@@ -131,7 +131,7 @@ public class TestIndexWriter {
     tables.put(new ImmutableBytesPtr(tableName), table);
 
     // setup the writer and failure policy
-    ParallelWriterIndexCommitter committer = new ParallelWriterIndexCommitter(VersionInfo.getVersion());
+    TrackingParallelWriterIndexCommitter committer = new TrackingParallelWriterIndexCommitter(VersionInfo.getVersion());
     committer.setup(factory, exec, abort, stop, e);
     KillServerOnFailurePolicy policy = new KillServerOnFailurePolicy();
     policy.setup(stop, abort);
@@ -145,91 +145,6 @@ public class TestIndexWriter {
   }
 
   /**
-   * Index updates can potentially be queued up if there aren't enough writer threads. If a running
-   * index write fails, then we should early exit the pending indexupdate, when it comes up (if the
-   * pool isn't already shutdown).
-   * <p>
-   * This test is a little bit racey - we could actually have the failure of the first task before
-   * the third task is even submitted. However, we should never see the third task attempt to make
-   * the batch write, so we should never see a failure here.
-   * @throws Exception on failure
-   */
-  @SuppressWarnings({ "unchecked", "deprecation" })
-  @Test
-  public void testFailureOnRunningUpdateAbortsPending() throws Exception {
-    Abortable abort = new StubAbortable();
-    Stoppable stop = Mockito.mock(Stoppable.class);
-    // single thread factory so the older request gets queued
-    ExecutorService exec = Executors.newFixedThreadPool(3);
-    Map<ImmutableBytesPtr, HTableInterface> tables = new HashMap<ImmutableBytesPtr, HTableInterface>();
-    FakeTableFactory factory = new FakeTableFactory(tables);
-    // updates to two different tables
-    byte[] tableName = Bytes.add(this.testName.getTableName(), new byte[] { 1, 2, 3, 4 });
-    byte[] tableName2 = this.testName.getTableName();// this will sort after the first tablename
-    // first table will fail
-    HTableInterface table = Mockito.mock(HTableInterface.class);
-    Mockito.when(table.batch(Mockito.anyList())).thenThrow(
-      new IOException("Intentional IOException for failed first write."));
-    Mockito.when(table.getTableName()).thenReturn(tableName);
-    RegionCoprocessorEnvironment e =Mockito.mock(RegionCoprocessorEnvironment.class);
-    Configuration conf =new Configuration();
-    Mockito.when(e.getConfiguration()).thenReturn(conf);
-    Mockito.when(e.getSharedData()).thenReturn(new ConcurrentHashMap<String,Object>());
-    // second table just blocks to make sure that the abort propagates to the third task
-    final CountDownLatch waitOnAbortedLatch = new CountDownLatch(1);
-    final boolean[] failed = new boolean[] { false };
-    HTableInterface table2 = Mockito.mock(HTableInterface.class);
-    Mockito.when(table2.getTableName()).thenReturn(tableName2);
-    Mockito.when(table2.batch(Mockito.anyList())).thenAnswer(new Answer<Void>() {
-      @Override
-      public Void answer(InvocationOnMock invocation) throws Throwable {
-        waitOnAbortedLatch.await();
-        return null;
-      }
-    }).thenAnswer(new Answer<Void>() {
-      @Override
-      public Void answer(InvocationOnMock invocation) throws Throwable {
-        failed[0] = true;
-        throw new RuntimeException(
-            "Unexpected exception - second index table shouldn't have been written to");
-      }
-    });
-
-    // add the tables to the set of tables, so its returned to the writer
-    tables.put(new ImmutableBytesPtr(tableName), table);
-    tables.put(new ImmutableBytesPtr(tableName2), table2);
-
-    ParallelWriterIndexCommitter committer = new ParallelWriterIndexCommitter(VersionInfo.getVersion());
-    committer.setup(factory, exec, abort, stop, e);
-    KillServerOnFailurePolicy policy = new KillServerOnFailurePolicy();
-    policy.setup(stop, abort);
-    IndexWriter writer = new IndexWriter(committer, policy);
-    try {
-        Put m = new Put(row);
-        m.add(Bytes.toBytes("family"), Bytes.toBytes("qual"), null);
-        HTableInterfaceReference ht1 = new HTableInterfaceReference(new ImmutableBytesPtr(tableName));
-        HTableInterfaceReference ht2 = new HTableInterfaceReference(new ImmutableBytesPtr(tableName2));
-        // We need to apply updates first for table1 and then table2.
-        Multimap<HTableInterfaceReference, Mutation> indexUpdates = LinkedListMultimap.create();
-        indexUpdates.put(ht1, m);
-        indexUpdates.put(ht2, m);
-        indexUpdates.put(ht2, m);
-        writer.write(indexUpdates, false);
-        fail("Should not have successfully completed all index writes");
-    } catch (SingleIndexWriteFailureException s) {
-      LOG.info("Correctly got a failure to reach the index", s);
-      // should have correctly gotten the correct abort, so let the next task execute
-      waitOnAbortedLatch.countDown();
-    }
-    assertFalse(
-      "Third set of index writes never have been attempted - should have seen the abort before done!",
-      failed[0]);
-    writer.stop(this.testName.getTableNameString() + " finished");
-    assertTrue("Factory didn't get shutdown after writer#stop!", factory.shutdown);
-    assertTrue("ExectorService isn't terminated after writer#stop!", exec.isShutdown());
-  }
-
-  /**
    * Test that if we get an interruption to to the thread while doing a batch (e.g. via shutdown),
    * that we correctly end the task
    * @throws Exception on failure
@@ -279,7 +194,7 @@ public class TestIndexWriter {
     indexUpdates.add(new Pair<Mutation, byte[]>(m, tableName));
 
     // setup the writer
-    ParallelWriterIndexCommitter committer = new ParallelWriterIndexCommitter(VersionInfo.getVersion());
+    TrackingParallelWriterIndexCommitter committer = new TrackingParallelWriterIndexCommitter(VersionInfo.getVersion());
     committer.setup(factory, exec, abort, stop, e );
     KillServerOnFailurePolicy policy = new KillServerOnFailurePolicy();
     policy.setup(stop, abort);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8565f365/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java
index e62af7a..3e2b47c 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java
@@ -66,7 +66,7 @@ public class TestParalleIndexWriter {
     Mockito.when(e.getSharedData()).thenReturn(new ConcurrentHashMap<String,Object>());
     FakeTableFactory factory = new FakeTableFactory(
         Collections.<ImmutableBytesPtr, HTableInterface> emptyMap());
-    ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter(VersionInfo.getVersion());
+    TrackingParallelWriterIndexCommitter writer = new TrackingParallelWriterIndexCommitter(VersionInfo.getVersion());
     Abortable mockAbort = Mockito.mock(Abortable.class);
     Stoppable mockStop = Mockito.mock(Stoppable.class);
     // create a simple writer
@@ -116,7 +116,7 @@ public class TestParalleIndexWriter {
     tables.put(tableName, table);
 
     // setup the writer and failure policy
-    ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter(VersionInfo.getVersion());
+    TrackingParallelWriterIndexCommitter writer = new TrackingParallelWriterIndexCommitter(VersionInfo.getVersion());
     writer.setup(factory, exec, abort, stop, e);
     writer.write(indexUpdates, true);
     assertTrue("Writer returned before the table batch completed! Likely a race condition tripped",

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8565f365/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java
index 789e7a1..32a6661 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java
@@ -62,7 +62,7 @@ public class TestParalleWriterIndexCommitter {
     ExecutorService exec = Executors.newFixedThreadPool(1);
     FakeTableFactory factory = new FakeTableFactory(
         Collections.<ImmutableBytesPtr, HTableInterface> emptyMap());
-    ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter(VersionInfo.getVersion());
+    TrackingParallelWriterIndexCommitter writer = new TrackingParallelWriterIndexCommitter(VersionInfo.getVersion());
     Abortable mockAbort = Mockito.mock(Abortable.class);
     Stoppable mockStop = Mockito.mock(Stoppable.class);
     RegionCoprocessorEnvironment e =Mockito.mock(RegionCoprocessorEnvironment.class);
@@ -117,7 +117,7 @@ public class TestParalleWriterIndexCommitter {
     tables.put(tableName, table);
 
     // setup the writer and failure policy
-    ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter(VersionInfo.getVersion());
+    TrackingParallelWriterIndexCommitter writer = new TrackingParallelWriterIndexCommitter(VersionInfo.getVersion());
     writer.setup(factory, exec, abort, stop, e);
     writer.write(indexUpdates, true);
     assertTrue("Writer returned before the table batch completed! Likely a race condition tripped",