You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/07/31 17:58:34 UTC

phoenix git commit: PHOENIX-4053 Lock row exclusively when necessary for mutable secondary indexing

Repository: phoenix
Updated Branches:
  refs/heads/master 9c458fa3d -> 54d9e1c36


PHOENIX-4053 Lock row exclusively when necessary for mutable secondary indexing


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/54d9e1c3
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/54d9e1c3
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/54d9e1c3

Branch: refs/heads/master
Commit: 54d9e1c36c46e7c50c29def08cf866599c7a4e45
Parents: 9c458fa
Author: James Taylor <ja...@apache.org>
Authored: Mon Jul 31 10:57:22 2017 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Mon Jul 31 10:57:22 2017 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/ConcurrentMutationsIT.java  | 405 +++++++++++++++++++
 .../org/apache/phoenix/hbase/index/Indexer.java |  61 ++-
 .../apache/phoenix/hbase/index/LockManager.java | 252 ++++++++++++
 .../hbase/index/builder/IndexBuildManager.java  |  12 +-
 .../java/org/apache/phoenix/util/TestUtil.java  |   2 +
 5 files changed, 713 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/54d9e1c3/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
new file mode 100644
index 0000000..19cb70e
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class ConcurrentMutationsIT extends BaseUniqueNamesOwnClusterIT {
+    private static final Random RAND = new Random();
+    private static final String MVCC_LOCK_TEST_TABLE_PREFIX = "MVCCLOCKTEST_";  
+    private static final String LOCK_TEST_TABLE_PREFIX = "LOCKTEST_";
+    private static final int ROW_LOCK_WAIT_TIME = 10000;
+    
+    private final Object lock = new Object();
+    private long scn = 100;
+
+    private static void addDelayingCoprocessor(Connection conn, String tableName) throws SQLException, IOException {
+        int priority = QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY + 100;
+        ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
+        HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName));
+        descriptor.addCoprocessor(DelayingRegionObserver.class.getName(), null, priority, null);
+        HBaseAdmin admin = services.getAdmin();
+        try {
+            admin.modifyTable(Bytes.toBytes(tableName), descriptor);
+        } finally {
+            admin.close();
+        }
+    }
+    
+    @Test
+    @Ignore
+    public void testSynchronousDeletesAndUpsertValues() throws Exception {
+        final String tableName = generateUniqueName();
+        final String indexName = generateUniqueName();
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY (k1,k2))");
+        addDelayingCoprocessor(conn, tableName);
+        conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v1)");
+        final CountDownLatch doneSignal = new CountDownLatch(2);
+        Runnable r1 = new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+                    for (int i = 0; i < 50; i++) {
+                        Thread.sleep(20);
+                        synchronized (lock) {
+                            scn += 10;
+                            PhoenixConnection conn = null;
+                            try {
+                                props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn));
+                                conn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class);
+                                conn.setAutoCommit(true);
+                                conn.createStatement().execute("DELETE FROM " + tableName);
+                            } finally {
+                                if (conn != null) conn.close();
+                            }
+                        }
+                    }
+                } catch (SQLException e) {
+                    throw new RuntimeException(e);
+                } catch (InterruptedException e) {
+                    Thread.interrupted();
+                    throw new RuntimeException(e);
+                } finally {
+                    doneSignal.countDown();
+                }
+            }
+            
+        };
+        Runnable r2 = new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+                    int nRowsToUpsert = 1000;
+                    for (int i = 0; i < nRowsToUpsert; i++) {
+                        synchronized(lock) {
+                            scn += 10;
+                            PhoenixConnection conn = null;
+                            try {
+                                props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn));
+                                conn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class);
+                                conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (" + (i % 10) + ", 0, 1)");
+                                if ((i % 20) == 0 || i == nRowsToUpsert-1 ) {
+                                    conn.commit();
+                                }
+                            } finally {
+                                if (conn != null) conn.close();
+                            }
+                        }
+                    }
+                } catch (SQLException e) {
+                    throw new RuntimeException(e);
+                } finally {
+                    doneSignal.countDown();
+                }
+            }
+            
+        };
+        Thread t1 = new Thread(r1);
+        t1.start();
+        Thread t2 = new Thread(r2);
+        t2.start();
+        
+        doneSignal.await(60, TimeUnit.SECONDS);
+        long count1 = getRowCount(conn, tableName);
+        long count2 = getRowCount(conn, indexName);
+        assertTrue("Expected table row count ( " + count1 + ") to match index row count (" + count2 + ")", count1 == count2);
+    }
+
+    @Test
+    @Ignore
+    public void testConcurrentDeletesAndUpsertValues() throws Exception {
+        final String tableName = generateUniqueName();
+        final String indexName = generateUniqueName();
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY (k1,k2))");
+        addDelayingCoprocessor(conn, tableName);
+        conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v1)");
+        final CountDownLatch doneSignal = new CountDownLatch(2);
+        Runnable r1 = new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    Connection conn = DriverManager.getConnection(getUrl());
+                    conn.setAutoCommit(true);
+                    for (int i = 0; i < 50; i++) {
+                        Thread.sleep(20);
+                        conn.createStatement().execute("DELETE FROM " + tableName);
+                    }
+                } catch (SQLException e) {
+                    throw new RuntimeException(e);
+                } catch (InterruptedException e) {
+                    Thread.interrupted();
+                    throw new RuntimeException(e);
+                } finally {
+                    doneSignal.countDown();
+                }
+            }
+            
+        };
+        Runnable r2 = new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    Connection conn = DriverManager.getConnection(getUrl());
+                    for (int i = 0; i < 1000; i++) {
+                        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (" + (i % 10) + ", 0, 1)");
+                        if ((i % 20) == 0) {
+                            conn.commit();
+                        }
+                    }
+                    conn.commit();
+                } catch (SQLException e) {
+                    throw new RuntimeException(e);
+                } finally {
+                    doneSignal.countDown();
+                }
+            }
+            
+        };
+        Thread t1 = new Thread(r1);
+        t1.start();
+        Thread t2 = new Thread(r2);
+        t2.start();
+        
+        doneSignal.await(60, TimeUnit.SECONDS);
+        long count1 = getRowCount(conn, tableName);
+        long count2 = getRowCount(conn, indexName);
+        assertTrue("Expected table row count ( " + count1 + ") to match index row count (" + count2 + ")", count1 == count2);
+    }
+    
+    @Test
+    public void testRowLockDuringPreBatchMutateWhenIndexed() throws Exception {
+        final String tableName = LOCK_TEST_TABLE_PREFIX + generateUniqueName();
+        final String indexName = generateUniqueName();
+        Connection conn = DriverManager.getConnection(getUrl());
+        
+        conn.createStatement().execute("CREATE TABLE " + tableName + "(k VARCHAR PRIMARY KEY, v INTEGER)");
+        addDelayingCoprocessor(conn, tableName);
+        conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v)");
+        final CountDownLatch doneSignal = new CountDownLatch(2);
+        final String[] failedMsg = new String[1];
+        Runnable r1 = new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    Connection conn = DriverManager.getConnection(getUrl());
+                    conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',0)");
+                    conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',1)");
+                    conn.commit();
+                } catch (Exception e) {
+                    failedMsg[0] = e.getMessage();
+                    throw new RuntimeException(e);
+                } finally {
+                    doneSignal.countDown();
+                }
+            }
+            
+        };
+        Runnable r2 = new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    Connection conn = DriverManager.getConnection(getUrl());
+                    conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',2)");
+                    conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',3)");
+                    conn.commit();
+                } catch (Exception e) {
+                    failedMsg[0] = e.getMessage();
+                    throw new RuntimeException(e);
+                } finally {
+                    doneSignal.countDown();
+                }
+            }
+            
+        };
+        Thread t1 = new Thread(r1);
+        t1.start();
+        Thread t2 = new Thread(r2);
+        t2.start();
+        
+        doneSignal.await(ROW_LOCK_WAIT_TIME + 5000, TimeUnit.SECONDS);
+        assertNull(failedMsg[0], failedMsg[0]);
+    }
+
+    @Test
+    public void testLockUntilMVCCAdvanced() throws Exception {
+        final String tableName = MVCC_LOCK_TEST_TABLE_PREFIX + generateUniqueName();
+        final String indexName = generateUniqueName();
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute("CREATE TABLE " + tableName + "(k VARCHAR PRIMARY KEY, v INTEGER)");
+        conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v,k)");
+        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',0)");
+        conn.commit();
+        addDelayingCoprocessor(conn, tableName);
+        final CountDownLatch doneSignal = new CountDownLatch(2);
+        final String[] failedMsg = new String[1];
+        Runnable r1 = new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    Connection conn = DriverManager.getConnection(getUrl());
+                    conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',1)");
+                    conn.commit();
+                } catch (Exception e) {
+                    failedMsg[0] = e.getMessage();
+                    throw new RuntimeException(e);
+                } finally {
+                    doneSignal.countDown();
+                }
+            }
+            
+        };
+        Runnable r2 = new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    Connection conn = DriverManager.getConnection(getUrl());
+                    conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',2)");
+                    conn.commit();
+                } catch (Exception e) {
+                    failedMsg[0] = e.getMessage();
+                    throw new RuntimeException(e);
+                } finally {
+                    doneSignal.countDown();
+                }
+            }
+            
+        };
+        Thread t1 = new Thread(r1);
+        t1.start();
+        Thread t2 = new Thread(r2);
+        t2.start();
+        
+        doneSignal.await(ROW_LOCK_WAIT_TIME + 5000, TimeUnit.SECONDS);
+        
+        TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName)));
+        TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName)));
+
+        long count1 = getRowCount(conn, tableName);
+        long count2 = getRowCount(conn, indexName);
+        assertTrue("Expected table row count ( " + count1 + ") to match index row count (" + count2 + ")", count1 == count2);
+        
+        ResultSet rs1 = conn.createStatement().executeQuery("SELECT * FROM " + indexName);
+        assertTrue(rs1.next());
+        ResultSet rs2 = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ * FROM " + tableName + " WHERE k = '" + rs1.getString(2) + "'");
+        assertTrue("Could not find row in table where k = '" + rs1.getString(2) + "'", rs2.next());
+        assertEquals(rs1.getInt(1), rs2.getInt(2));
+        assertFalse(rs1.next());
+        assertFalse(rs2.next());
+    }
+
+    private static long getRowCount(Connection conn, String tableName) throws SQLException {
+        ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ count(*) FROM " + tableName);
+        assertTrue(rs.next());
+        return rs.getLong(1);
+    }
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(10);
+        clientProps.put(QueryServices.DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB, Integer.toString(0));
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
+        serverProps.put("hbase.rowlock.wait.duration", Integer.toString(ROW_LOCK_WAIT_TIME));
+        serverProps.put(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, Integer.toString(3));
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
+    }
+
+
+    public static class DelayingRegionObserver extends SimpleRegionObserver {
+        private volatile boolean lockedTableRow;
+        
+        @Override
+        public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+            try {
+                String tableName = c.getEnvironment().getRegionInfo().getTable().getNameAsString();
+                if (tableName.startsWith(MVCC_LOCK_TEST_TABLE_PREFIX)) {
+                    Thread.sleep(ROW_LOCK_WAIT_TIME/2); // Wait long enough that they'll both have the same mvcc
+                }
+            } catch (InterruptedException e) {
+            }
+        }
+        
+        @Override
+        public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
+            try {
+                String tableName = c.getEnvironment().getRegionInfo().getTable().getNameAsString();
+                if (tableName.startsWith(LOCK_TEST_TABLE_PREFIX)) {
+                    if (lockedTableRow) {
+                        throw new DoNotRetryIOException("Expected lock in preBatchMutate to be exclusive, but it wasn't for row " + Bytes.toStringBinary(miniBatchOp.getOperation(0).getRow()));
+                    }
+                    lockedTableRow = true;
+                    Thread.sleep(ROW_LOCK_WAIT_TIME + 2000);
+                }
+                Thread.sleep(Math.abs(RAND.nextInt()) % 10);
+            } catch (InterruptedException e) {
+            } finally {
+                lockedTableRow = false;
+            }
+            
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/54d9e1c3/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index ea5bf4f..8523977 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -117,6 +117,7 @@ public class Indexer extends BaseRegionObserver {
 
   protected IndexWriter writer;
   protected IndexBuildManager builder;
+  private LockManager lockManager;
 
   /** Configuration key for the {@link IndexBuilder} to use */
   public static final String INDEX_BUILDER_CONF_KEY = "index.builder";
@@ -162,16 +163,19 @@ public class Indexer extends BaseRegionObserver {
   private long slowPreWALRestoreThreshold;
   private long slowPostOpenThreshold;
   private long slowPreIncrementThreshold;
-
+  private int rowLockWaitDuration;
+  
   public static final String RecoveryFailurePolicyKeyForTesting = INDEX_RECOVERY_FAILURE_POLICY_KEY;
 
-    public static final int INDEXING_SUPPORTED_MAJOR_VERSION = VersionUtil
+  public static final int INDEXING_SUPPORTED_MAJOR_VERSION = VersionUtil
             .encodeMaxPatchVersion(0, 94);
-    public static final int INDEXING_SUPPORTED__MIN_MAJOR_VERSION = VersionUtil
+  public static final int INDEXING_SUPPORTED__MIN_MAJOR_VERSION = VersionUtil
             .encodeVersion("0.94.0");
-    private static final int INDEX_WAL_COMPRESSION_MINIMUM_SUPPORTED_VERSION = VersionUtil
+  private static final int INDEX_WAL_COMPRESSION_MINIMUM_SUPPORTED_VERSION = VersionUtil
             .encodeVersion("0.94.9");
 
+  private static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
+
   @Override
   public void start(CoprocessorEnvironment e) throws IOException {
       try {
@@ -206,6 +210,10 @@ public class Indexer extends BaseRegionObserver {
         DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(clonedConfig, env);
         // setup the actual index writer
         this.writer = new IndexWriter(indexWriterEnv, serverName + "-index-writer");
+        
+        this.rowLockWaitDuration = clonedConfig.getInt("hbase.rowlock.wait.duration",
+                DEFAULT_ROWLOCK_WAIT_DURATION);
+        this.lockManager = new LockManager();
 
         // Metrics impl for the Indexer -- avoiding unnecessary indirection for hadoop-1/2 compat
         this.metricSource = MetricsIndexerSourceFactory.getInstance().create();
@@ -346,8 +354,9 @@ public class Indexer extends BaseRegionObserver {
         "Somehow didn't return an index update but also didn't propagate the failure to the client!");
   }
 
-  private static final OperationStatus SUCCESS = new OperationStatus(OperationStatusCode.SUCCESS);
-  
+  private static final OperationStatus IGNORE = new OperationStatus(OperationStatusCode.SUCCESS);
+  private static final OperationStatus FAILURE = new OperationStatus(OperationStatusCode.FAILURE, "Unable to acquire row lock");
+
   public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
           MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
 
@@ -365,7 +374,7 @@ public class Indexer extends BaseRegionObserver {
       for (int i = 0; i < miniBatchOp.size(); i++) {
           Mutation m = miniBatchOp.getOperation(i);
           if (this.builder.isAtomicOp(m)) {
-              miniBatchOp.setOperationStatus(i, SUCCESS);
+              miniBatchOp.setOperationStatus(i, IGNORE);
               continue;
           }
           // skip this mutation if we aren't enabling indexing
@@ -373,13 +382,40 @@ public class Indexer extends BaseRegionObserver {
           // should be indexed, which means we need to expose another method on the builder. Such is the
           // way optimization go though.
           if (this.builder.isEnabled(m)) {
+              boolean success = false;
+              try {
+                  lockManager.lockRow(m.getRow(), rowLockWaitDuration);
+                  success = true;
+              } finally {
+                  if (!success) {
+                      // We're throwing here as a result of either a timeout while waiting
+                      // for the row lock or an interrupt. Either way, the lock on the
+                      // current row was unsuccessful and we won't be locking any more rows
+                      // since we're throwing. By setting the operation status to FAILURE
+                      // here, we prevent the attempt to unlock rows we've never locked when
+                      // postBatchMutateIndispensably is executed. We're very limited wrt
+                      // the state that can be shared between the batch mutate coprocessor
+                      // calls (see HBASE-18127).
+                      // Note that we shouldn't necessarily be throwing here, since we're
+                      // essentially failing the data write because we can't do the locking
+                      // necessary for performing consistent index maintenance. We'd ideally
+                      // want to go through the index failure policy to determine what action
+                      // to perform. We currently cannot ignore this lock failure as we lack
+                      // the ability to keep that state (PHOENIX-4055).
+                      for (int j = i; j < miniBatchOp.size(); j++) {
+                          miniBatchOp.setOperationStatus(j,FAILURE);
+                      }
+                  }
+              }
               Durability effectiveDurablity = (m.getDurability() == Durability.USE_DEFAULT) ? 
                       defaultDurability : m.getDurability();
               if (effectiveDurablity.ordinal() > durability.ordinal()) {
                   durability = effectiveDurablity;
               }
     
-              // add the mutation to the batch set
+              // TODO: remove this code as Phoenix prevents any duplicate
+              // rows in the batch mutation from the client side (PHOENIX-4054).
+              // Add the mutation to the batch set
               ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
               MultiMutation stored = mutations.get(row);
               // we haven't seen this row before, so add it
@@ -390,7 +426,7 @@ public class Indexer extends BaseRegionObserver {
               stored.addAll(m);
           }
       }
-
+    
       // early exit if it turns out we don't have any edits
       if (mutations.isEmpty()) {
           return;
@@ -403,6 +439,7 @@ public class Indexer extends BaseRegionObserver {
           edit = new WALEdit();
           miniBatchOp.setWalEdit(0, edit);
       }
+  
 
       // get the current span, or just use a null-span to avoid a bunch of if statements
       try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
@@ -488,6 +525,12 @@ public class Indexer extends BaseRegionObserver {
       }
       long start = EnvironmentEdgeManager.currentTimeMillis();
       try {
+          for (int i = 0; i < miniBatchOp.size(); i++) {
+              OperationStatus status = miniBatchOp.getOperationStatus(i);
+              if (status != IGNORE && status != FAILURE) {
+                  lockManager.unlockRow(miniBatchOp.getOperation(i).getRow());
+              }
+          }
           this.builder.batchCompleted(miniBatchOp);
 
           if (success) { // if miniBatchOp was successfully written, write index updates

http://git-wip-us.apache.org/repos/asf/phoenix/blob/54d9e1c3/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/LockManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/LockManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/LockManager.java
new file mode 100644
index 0000000..02e4c3c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/LockManager.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ * 
+ * Class, copied for the most part from HRegion.getRowLockInternal implementation
+ * that manages reentrant row locks based on the row key. Phoenix needs to manage
+ * it's own locking due to secondary indexes needing a consistent snapshot from
+ * the time the mvcc is acquired until the time it is advanced (PHOENIX-4053).
+ *
+ */
+public class LockManager {
+    private static final Log LOG = LogFactory.getLog(LockManager.class);
+
+    private final ConcurrentHashMap<ImmutableBytesPtr, RowLockContext> lockedRows =
+            new ConcurrentHashMap<ImmutableBytesPtr, RowLockContext>();
+
+    public LockManager () {
+    }
+
+    /**
+     * Lock the row or throw otherwise
+     * @param row the row key
+     * @return RowLock used to eventually release the lock 
+     * @throws TimeoutIOException if the lock could not be acquired within the
+     * allowed rowLockWaitDuration and InterruptedException if interrupted while
+     * waiting to acquire lock.
+     */
+    public RowLock lockRow(byte[] row, int waitDuration) throws IOException {
+        // create an object to use a a key in the row lock map
+        ImmutableBytesPtr rowKey = new ImmutableBytesPtr(row);
+
+        RowLockContext rowLockContext = null;
+        RowLockImpl result = null;
+        TraceScope traceScope = null;
+
+        // If we're tracing start a span to show how long this took.
+        if (Trace.isTracing()) {
+            traceScope = Trace.startSpan("LockManager.getRowLock");
+            traceScope.getSpan().addTimelineAnnotation("Getting a lock");
+        }
+
+        boolean success = false;
+        try {
+            // Keep trying until we have a lock or error out.
+            // TODO: do we need to add a time component here?
+            while (result == null) {
+
+                // Try adding a RowLockContext to the lockedRows.
+                // If we can add it then there's no other transactions currently running.
+                rowLockContext = new RowLockContext(rowKey);
+                RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
+
+                // if there was a running transaction then there's already a context.
+                if (existingContext != null) {
+                    rowLockContext = existingContext;
+                }
+
+                result = rowLockContext.newRowLock();
+            }
+            if (!result.getLock().tryLock(waitDuration, TimeUnit.MILLISECONDS)) {
+                if (traceScope != null) {
+                    traceScope.getSpan().addTimelineAnnotation("Failed to get row lock");
+                }
+                throw new TimeoutIOException("Timed out waiting for lock for row: " + rowKey);
+            }
+            rowLockContext.setThreadName(Thread.currentThread().getName());
+            success = true;
+            return result;
+        } catch (InterruptedException ie) {
+            LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
+            InterruptedIOException iie = new InterruptedIOException();
+            iie.initCause(ie);
+            if (traceScope != null) {
+                traceScope.getSpan().addTimelineAnnotation("Interrupted exception getting row lock");
+            }
+            Thread.currentThread().interrupt();
+            throw iie;
+        } finally {
+            // On failure, clean up the counts just in case this was the thing keeping the context alive.
+            if (!success && rowLockContext != null) rowLockContext.cleanUp();
+            if (traceScope != null) {
+                traceScope.close();
+            }
+        }
+    }
+
+    /**
+     * Unlock the row. We need this stateless way of unlocking because
+     * we have no means of passing the RowLock instances between
+     * coprocessor calls (see HBASE-18482). Once we have that, we
+     * can have the caller collect RowLock instances and free when
+     * needed.
+     * @param row the row key
+     * @throws IOException
+     */
+    public void unlockRow(byte[] row) throws IOException {
+        ImmutableBytesPtr rowKey = new ImmutableBytesPtr(row);
+        RowLockContext lockContext = lockedRows.get(rowKey);
+        if (lockContext != null) {
+            lockContext.releaseRowLock();
+        }
+    }
+
+    class RowLockContext {
+        private final ImmutableBytesPtr rowKey;
+        // TODO: consider making this non atomic. It's only saving one
+        // synchronization in the case of cleanup() when more than one
+        // thread is holding on to the lock.
+        private final AtomicInteger count = new AtomicInteger(0);
+        private final ReentrantLock reentrantLock = new ReentrantLock(true);
+        // TODO: remove once we can pass List<RowLock> as needed through
+        // coprocessor calls.
+        private volatile RowLockImpl rowLock = RowLockImpl.UNINITIALIZED;
+        private String threadName;
+
+        RowLockContext(ImmutableBytesPtr rowKey) {
+            this.rowKey = rowKey;
+        }
+
+        RowLockImpl newRowLock() {
+            count.incrementAndGet();
+            synchronized (this) {
+                if (rowLock != null) {
+                    rowLock = new RowLockImpl(this, reentrantLock);
+                    return rowLock;
+                } else {
+                    return null;
+                }
+            }
+        }
+
+        void releaseRowLock() {
+            synchronized (this) {
+                if (rowLock != null) {
+                    rowLock.release();
+                }
+            }
+        }
+        
+        void cleanUp() {
+            long c = count.decrementAndGet();
+            if (c <= 0) {
+                synchronized (this) {
+                    if (count.get() <= 0 && rowLock != null){
+                        rowLock = null;
+                        RowLockContext removed = lockedRows.remove(rowKey);
+                        assert removed == this: "we should never remove a different context";
+                    }
+                }
+            }
+        }
+
+        void setThreadName(String threadName) {
+            this.threadName = threadName;
+        }
+
+        @Override
+        public String toString() {
+            return "RowLockContext{" +
+                    "row=" + rowKey +
+                    ", readWriteLock=" + reentrantLock +
+                    ", count=" + count +
+                    ", threadName=" + threadName +
+                    '}';
+        }
+    }
+
+    /**
+     * Class used to represent a lock on a row.
+     */
+    public static class RowLockImpl implements RowLock {
+        static final RowLockImpl UNINITIALIZED = new RowLockImpl();
+        private final RowLockContext context;
+        private final Lock lock;
+
+        private RowLockImpl() {
+            context = null;
+            lock = null;
+        }
+        
+        RowLockImpl(RowLockContext context, Lock lock) {
+            this.context = context;
+            this.lock = lock;
+        }
+
+        Lock getLock() {
+            return lock;
+        }
+
+        @Override
+        public void release() {
+            lock.unlock();
+            context.cleanUp();
+        }
+
+        @Override
+        public String toString() {
+            return "RowLockImpl{" +
+                    "context=" + context +
+                    ", lock=" + lock +
+                    '}';
+        }
+    }
+
+    /**
+     * Row lock held by a given thread.
+     * One thread may acquire multiple locks on the same row simultaneously.
+     * The locks must be released by calling release() from the same thread.
+     */
+    public interface RowLock {
+        /**
+         * Release the given lock.  If there are no remaining locks held by the current thread
+         * then unlock the row and allow other threads to acquire the lock.
+         * @throws IllegalArgumentException if called by a different thread than the lock owning
+         *     thread
+         */
+        void release();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/54d9e1c3/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index c015a77..0567d35 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -21,8 +21,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,12 +34,6 @@ import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
-import org.apache.phoenix.hbase.index.parallel.QuickFailingTaskRunner;
-import org.apache.phoenix.hbase.index.parallel.Task;
-import org.apache.phoenix.hbase.index.parallel.TaskBatch;
-import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
-
-import com.google.common.util.concurrent.MoreExecutors;
 
 /**
  * Manage the building of index updates from primary table updates.
@@ -90,7 +82,8 @@ public class IndexBuildManager implements Stoppable {
     // Avoid the Object overhead of the executor when it's not actually parallelizing anything.
     ArrayList<Pair<Mutation, byte[]>> results = new ArrayList<>(mutations.size());
     for (Mutation m : mutations) {
-      results.addAll(delegate.getIndexUpdate(m, indexMetaData));
+      Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData);
+      results.addAll(updates);
     }
     return results;
   }
@@ -139,5 +132,4 @@ public class IndexBuildManager implements Stoppable {
   public IndexBuilder getBuilderForTesting() {
     return this.delegate;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/54d9e1c3/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 012c663..8a5a8e4 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -838,6 +838,8 @@ public class TestUtil {
     public static void dumpTable(HTableInterface table) throws IOException {
         System.out.println("************ dumping " + table + " **************");
         Scan s = new Scan();
+        s.setRaw(true);;
+        s.setMaxVersions();
         try (ResultScanner scanner = table.getScanner(s)) {
             Result result = null;
             while ((result = scanner.next()) != null) {