You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/07/31 17:58:34 UTC
phoenix git commit: PHOENIX-4053 Lock row exclusively when necessary
for mutable secondary indexing
Repository: phoenix
Updated Branches:
refs/heads/master 9c458fa3d -> 54d9e1c36
PHOENIX-4053 Lock row exclusively when necessary for mutable secondary indexing
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/54d9e1c3
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/54d9e1c3
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/54d9e1c3
Branch: refs/heads/master
Commit: 54d9e1c36c46e7c50c29def08cf866599c7a4e45
Parents: 9c458fa
Author: James Taylor <ja...@apache.org>
Authored: Mon Jul 31 10:57:22 2017 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Mon Jul 31 10:57:22 2017 -0700
----------------------------------------------------------------------
.../phoenix/end2end/ConcurrentMutationsIT.java | 405 +++++++++++++++++++
.../org/apache/phoenix/hbase/index/Indexer.java | 61 ++-
.../apache/phoenix/hbase/index/LockManager.java | 252 ++++++++++++
.../hbase/index/builder/IndexBuildManager.java | 12 +-
.../java/org/apache/phoenix/util/TestUtil.java | 2 +
5 files changed, 713 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/54d9e1c3/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
new file mode 100644
index 0000000..19cb70e
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class ConcurrentMutationsIT extends BaseUniqueNamesOwnClusterIT {
+ private static final Random RAND = new Random();
+ private static final String MVCC_LOCK_TEST_TABLE_PREFIX = "MVCCLOCKTEST_";
+ private static final String LOCK_TEST_TABLE_PREFIX = "LOCKTEST_";
+ private static final int ROW_LOCK_WAIT_TIME = 10000;
+
+ private final Object lock = new Object();
+ private long scn = 100;
+
+ private static void addDelayingCoprocessor(Connection conn, String tableName) throws SQLException, IOException {
+ int priority = QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY + 100;
+ ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
+ HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName));
+ descriptor.addCoprocessor(DelayingRegionObserver.class.getName(), null, priority, null);
+ HBaseAdmin admin = services.getAdmin();
+ try {
+ admin.modifyTable(Bytes.toBytes(tableName), descriptor);
+ } finally {
+ admin.close();
+ }
+ }
+
+ @Test
+ @Ignore
+ public void testSynchronousDeletesAndUpsertValues() throws Exception {
+ final String tableName = generateUniqueName();
+ final String indexName = generateUniqueName();
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY (k1,k2))");
+ addDelayingCoprocessor(conn, tableName);
+ conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v1)");
+ final CountDownLatch doneSignal = new CountDownLatch(2);
+ Runnable r1 = new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+ for (int i = 0; i < 50; i++) {
+ Thread.sleep(20);
+ synchronized (lock) {
+ scn += 10;
+ PhoenixConnection conn = null;
+ try {
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn));
+ conn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class);
+ conn.setAutoCommit(true);
+ conn.createStatement().execute("DELETE FROM " + tableName);
+ } finally {
+ if (conn != null) conn.close();
+ }
+ }
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new RuntimeException(e);
+ } finally {
+ doneSignal.countDown();
+ }
+ }
+
+ };
+ Runnable r2 = new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+ int nRowsToUpsert = 1000;
+ for (int i = 0; i < nRowsToUpsert; i++) {
+ synchronized(lock) {
+ scn += 10;
+ PhoenixConnection conn = null;
+ try {
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn));
+ conn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class);
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (" + (i % 10) + ", 0, 1)");
+ if ((i % 20) == 0 || i == nRowsToUpsert-1 ) {
+ conn.commit();
+ }
+ } finally {
+ if (conn != null) conn.close();
+ }
+ }
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ } finally {
+ doneSignal.countDown();
+ }
+ }
+
+ };
+ Thread t1 = new Thread(r1);
+ t1.start();
+ Thread t2 = new Thread(r2);
+ t2.start();
+
+ doneSignal.await(60, TimeUnit.SECONDS);
+ long count1 = getRowCount(conn, tableName);
+ long count2 = getRowCount(conn, indexName);
+ assertTrue("Expected table row count ( " + count1 + ") to match index row count (" + count2 + ")", count1 == count2);
+ }
+
+ @Test
+ @Ignore
+ public void testConcurrentDeletesAndUpsertValues() throws Exception {
+ final String tableName = generateUniqueName();
+ final String indexName = generateUniqueName();
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY (k1,k2))");
+ addDelayingCoprocessor(conn, tableName);
+ conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v1)");
+ final CountDownLatch doneSignal = new CountDownLatch(2);
+ Runnable r1 = new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.setAutoCommit(true);
+ for (int i = 0; i < 50; i++) {
+ Thread.sleep(20);
+ conn.createStatement().execute("DELETE FROM " + tableName);
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new RuntimeException(e);
+ } finally {
+ doneSignal.countDown();
+ }
+ }
+
+ };
+ Runnable r2 = new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ Connection conn = DriverManager.getConnection(getUrl());
+ for (int i = 0; i < 1000; i++) {
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (" + (i % 10) + ", 0, 1)");
+ if ((i % 20) == 0) {
+ conn.commit();
+ }
+ }
+ conn.commit();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ } finally {
+ doneSignal.countDown();
+ }
+ }
+
+ };
+ Thread t1 = new Thread(r1);
+ t1.start();
+ Thread t2 = new Thread(r2);
+ t2.start();
+
+ doneSignal.await(60, TimeUnit.SECONDS);
+ long count1 = getRowCount(conn, tableName);
+ long count2 = getRowCount(conn, indexName);
+ assertTrue("Expected table row count ( " + count1 + ") to match index row count (" + count2 + ")", count1 == count2);
+ }
+
+ @Test
+ public void testRowLockDuringPreBatchMutateWhenIndexed() throws Exception {
+ final String tableName = LOCK_TEST_TABLE_PREFIX + generateUniqueName();
+ final String indexName = generateUniqueName();
+ Connection conn = DriverManager.getConnection(getUrl());
+
+ conn.createStatement().execute("CREATE TABLE " + tableName + "(k VARCHAR PRIMARY KEY, v INTEGER)");
+ addDelayingCoprocessor(conn, tableName);
+ conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v)");
+ final CountDownLatch doneSignal = new CountDownLatch(2);
+ final String[] failedMsg = new String[1];
+ Runnable r1 = new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',0)");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',1)");
+ conn.commit();
+ } catch (Exception e) {
+ failedMsg[0] = e.getMessage();
+ throw new RuntimeException(e);
+ } finally {
+ doneSignal.countDown();
+ }
+ }
+
+ };
+ Runnable r2 = new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',2)");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',3)");
+ conn.commit();
+ } catch (Exception e) {
+ failedMsg[0] = e.getMessage();
+ throw new RuntimeException(e);
+ } finally {
+ doneSignal.countDown();
+ }
+ }
+
+ };
+ Thread t1 = new Thread(r1);
+ t1.start();
+ Thread t2 = new Thread(r2);
+ t2.start();
+
+ doneSignal.await(ROW_LOCK_WAIT_TIME + 5000, TimeUnit.SECONDS);
+ assertNull(failedMsg[0], failedMsg[0]);
+ }
+
+ @Test
+ public void testLockUntilMVCCAdvanced() throws Exception {
+ final String tableName = MVCC_LOCK_TEST_TABLE_PREFIX + generateUniqueName();
+ final String indexName = generateUniqueName();
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute("CREATE TABLE " + tableName + "(k VARCHAR PRIMARY KEY, v INTEGER)");
+ conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v,k)");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',0)");
+ conn.commit();
+ addDelayingCoprocessor(conn, tableName);
+ final CountDownLatch doneSignal = new CountDownLatch(2);
+ final String[] failedMsg = new String[1];
+ Runnable r1 = new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',1)");
+ conn.commit();
+ } catch (Exception e) {
+ failedMsg[0] = e.getMessage();
+ throw new RuntimeException(e);
+ } finally {
+ doneSignal.countDown();
+ }
+ }
+
+ };
+ Runnable r2 = new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',2)");
+ conn.commit();
+ } catch (Exception e) {
+ failedMsg[0] = e.getMessage();
+ throw new RuntimeException(e);
+ } finally {
+ doneSignal.countDown();
+ }
+ }
+
+ };
+ Thread t1 = new Thread(r1);
+ t1.start();
+ Thread t2 = new Thread(r2);
+ t2.start();
+
+ doneSignal.await(ROW_LOCK_WAIT_TIME + 5000, TimeUnit.SECONDS);
+
+ TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(tableName)));
+ TestUtil.dumpTable(conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexName)));
+
+ long count1 = getRowCount(conn, tableName);
+ long count2 = getRowCount(conn, indexName);
+ assertTrue("Expected table row count ( " + count1 + ") to match index row count (" + count2 + ")", count1 == count2);
+
+ ResultSet rs1 = conn.createStatement().executeQuery("SELECT * FROM " + indexName);
+ assertTrue(rs1.next());
+ ResultSet rs2 = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ * FROM " + tableName + " WHERE k = '" + rs1.getString(2) + "'");
+ assertTrue("Could not find row in table where k = '" + rs1.getString(2) + "'", rs2.next());
+ assertEquals(rs1.getInt(1), rs2.getInt(2));
+ assertFalse(rs1.next());
+ assertFalse(rs2.next());
+ }
+
+ private static long getRowCount(Connection conn, String tableName) throws SQLException {
+ ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ count(*) FROM " + tableName);
+ assertTrue(rs.next());
+ return rs.getLong(1);
+ }
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(10);
+ clientProps.put(QueryServices.DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB, Integer.toString(0));
+ Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
+ serverProps.put("hbase.rowlock.wait.duration", Integer.toString(ROW_LOCK_WAIT_TIME));
+ serverProps.put(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, Integer.toString(3));
+ setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
+ }
+
+
+ public static class DelayingRegionObserver extends SimpleRegionObserver {
+ private volatile boolean lockedTableRow;
+
+ @Override
+ public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+ try {
+ String tableName = c.getEnvironment().getRegionInfo().getTable().getNameAsString();
+ if (tableName.startsWith(MVCC_LOCK_TEST_TABLE_PREFIX)) {
+ Thread.sleep(ROW_LOCK_WAIT_TIME/2); // Wait long enough that they'll both have the same mvcc
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+
+ @Override
+ public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
+ try {
+ String tableName = c.getEnvironment().getRegionInfo().getTable().getNameAsString();
+ if (tableName.startsWith(LOCK_TEST_TABLE_PREFIX)) {
+ if (lockedTableRow) {
+ throw new DoNotRetryIOException("Expected lock in preBatchMutate to be exclusive, but it wasn't for row " + Bytes.toStringBinary(miniBatchOp.getOperation(0).getRow()));
+ }
+ lockedTableRow = true;
+ Thread.sleep(ROW_LOCK_WAIT_TIME + 2000);
+ }
+ Thread.sleep(Math.abs(RAND.nextInt()) % 10);
+ } catch (InterruptedException e) {
+ } finally {
+ lockedTableRow = false;
+ }
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/54d9e1c3/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index ea5bf4f..8523977 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -117,6 +117,7 @@ public class Indexer extends BaseRegionObserver {
protected IndexWriter writer;
protected IndexBuildManager builder;
+ private LockManager lockManager;
/** Configuration key for the {@link IndexBuilder} to use */
public static final String INDEX_BUILDER_CONF_KEY = "index.builder";
@@ -162,16 +163,19 @@ public class Indexer extends BaseRegionObserver {
private long slowPreWALRestoreThreshold;
private long slowPostOpenThreshold;
private long slowPreIncrementThreshold;
-
+ private int rowLockWaitDuration;
+
public static final String RecoveryFailurePolicyKeyForTesting = INDEX_RECOVERY_FAILURE_POLICY_KEY;
- public static final int INDEXING_SUPPORTED_MAJOR_VERSION = VersionUtil
+ public static final int INDEXING_SUPPORTED_MAJOR_VERSION = VersionUtil
.encodeMaxPatchVersion(0, 94);
- public static final int INDEXING_SUPPORTED__MIN_MAJOR_VERSION = VersionUtil
+ public static final int INDEXING_SUPPORTED__MIN_MAJOR_VERSION = VersionUtil
.encodeVersion("0.94.0");
- private static final int INDEX_WAL_COMPRESSION_MINIMUM_SUPPORTED_VERSION = VersionUtil
+ private static final int INDEX_WAL_COMPRESSION_MINIMUM_SUPPORTED_VERSION = VersionUtil
.encodeVersion("0.94.9");
+ private static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
+
@Override
public void start(CoprocessorEnvironment e) throws IOException {
try {
@@ -206,6 +210,10 @@ public class Indexer extends BaseRegionObserver {
DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(clonedConfig, env);
// setup the actual index writer
this.writer = new IndexWriter(indexWriterEnv, serverName + "-index-writer");
+
+ this.rowLockWaitDuration = clonedConfig.getInt("hbase.rowlock.wait.duration",
+ DEFAULT_ROWLOCK_WAIT_DURATION);
+ this.lockManager = new LockManager();
// Metrics impl for the Indexer -- avoiding unnecessary indirection for hadoop-1/2 compat
this.metricSource = MetricsIndexerSourceFactory.getInstance().create();
@@ -346,8 +354,9 @@ public class Indexer extends BaseRegionObserver {
"Somehow didn't return an index update but also didn't propagate the failure to the client!");
}
- private static final OperationStatus SUCCESS = new OperationStatus(OperationStatusCode.SUCCESS);
-
+ private static final OperationStatus IGNORE = new OperationStatus(OperationStatusCode.SUCCESS);
+ private static final OperationStatus FAILURE = new OperationStatus(OperationStatusCode.FAILURE, "Unable to acquire row lock");
+
public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
@@ -365,7 +374,7 @@ public class Indexer extends BaseRegionObserver {
for (int i = 0; i < miniBatchOp.size(); i++) {
Mutation m = miniBatchOp.getOperation(i);
if (this.builder.isAtomicOp(m)) {
- miniBatchOp.setOperationStatus(i, SUCCESS);
+ miniBatchOp.setOperationStatus(i, IGNORE);
continue;
}
// skip this mutation if we aren't enabling indexing
@@ -373,13 +382,40 @@ public class Indexer extends BaseRegionObserver {
// should be indexed, which means we need to expose another method on the builder. Such is the
// way optimization go though.
if (this.builder.isEnabled(m)) {
+ boolean success = false;
+ try {
+ lockManager.lockRow(m.getRow(), rowLockWaitDuration);
+ success = true;
+ } finally {
+ if (!success) {
+ // We're throwing here as a result of either a timeout while waiting
+ // for the row lock or an interrupt. Either way, the lock on the
+ // current row was unsuccessful and we won't be locking any more rows
+ // since we're throwing. By setting the operation status to FAILURE
+ // here, we prevent the attempt to unlock rows we've never locked when
+ // postBatchMutateIndispensably is executed. We're very limited wrt
+ // the state that can be shared between the batch mutate coprocessor
+ // calls (see HBASE-18127).
+ // Note that we shouldn't necessarily be throwing here, since we're
+ // essentially failing the data write because we can't do the locking
+ // necessary for performing consistent index maintenance. We'd ideally
+ // want to go through the index failure policy to determine what action
+ // to perform. We currently cannot ignore this lock failure as we lack
+ // the ability to keep that state (PHOENIX-4055).
+ for (int j = i; j < miniBatchOp.size(); j++) {
+ miniBatchOp.setOperationStatus(j,FAILURE);
+ }
+ }
+ }
Durability effectiveDurablity = (m.getDurability() == Durability.USE_DEFAULT) ?
defaultDurability : m.getDurability();
if (effectiveDurablity.ordinal() > durability.ordinal()) {
durability = effectiveDurablity;
}
- // add the mutation to the batch set
+ // TODO: remove this code as Phoenix prevents any duplicate
+ // rows in the batch mutation from the client side (PHOENIX-4054).
+ // Add the mutation to the batch set
ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
MultiMutation stored = mutations.get(row);
// we haven't seen this row before, so add it
@@ -390,7 +426,7 @@ public class Indexer extends BaseRegionObserver {
stored.addAll(m);
}
}
-
+
// early exit if it turns out we don't have any edits
if (mutations.isEmpty()) {
return;
@@ -403,6 +439,7 @@ public class Indexer extends BaseRegionObserver {
edit = new WALEdit();
miniBatchOp.setWalEdit(0, edit);
}
+
// get the current span, or just use a null-span to avoid a bunch of if statements
try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
@@ -488,6 +525,12 @@ public class Indexer extends BaseRegionObserver {
}
long start = EnvironmentEdgeManager.currentTimeMillis();
try {
+ for (int i = 0; i < miniBatchOp.size(); i++) {
+ OperationStatus status = miniBatchOp.getOperationStatus(i);
+ if (status != IGNORE && status != FAILURE) {
+ lockManager.unlockRow(miniBatchOp.getOperation(i).getRow());
+ }
+ }
this.builder.batchCompleted(miniBatchOp);
if (success) { // if miniBatchOp was successfully written, write index updates
http://git-wip-us.apache.org/repos/asf/phoenix/blob/54d9e1c3/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/LockManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/LockManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/LockManager.java
new file mode 100644
index 0000000..02e4c3c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/LockManager.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ *
+ * Class, copied for the most part from HRegion.getRowLockInternal implementation
+ * that manages reentrant row locks based on the row key. Phoenix needs to manage
+ * it's own locking due to secondary indexes needing a consistent snapshot from
+ * the time the mvcc is acquired until the time it is advanced (PHOENIX-4053).
+ *
+ */
+public class LockManager {
+ private static final Log LOG = LogFactory.getLog(LockManager.class);
+
+ private final ConcurrentHashMap<ImmutableBytesPtr, RowLockContext> lockedRows =
+ new ConcurrentHashMap<ImmutableBytesPtr, RowLockContext>();
+
+ public LockManager () {
+ }
+
+ /**
+ * Lock the row or throw otherwise
+ * @param row the row key
+ * @return RowLock used to eventually release the lock
+ * @throws TimeoutIOException if the lock could not be acquired within the
+ * allowed rowLockWaitDuration and InterruptedException if interrupted while
+ * waiting to acquire lock.
+ */
+ public RowLock lockRow(byte[] row, int waitDuration) throws IOException {
+ // create an object to use a a key in the row lock map
+ ImmutableBytesPtr rowKey = new ImmutableBytesPtr(row);
+
+ RowLockContext rowLockContext = null;
+ RowLockImpl result = null;
+ TraceScope traceScope = null;
+
+ // If we're tracing start a span to show how long this took.
+ if (Trace.isTracing()) {
+ traceScope = Trace.startSpan("LockManager.getRowLock");
+ traceScope.getSpan().addTimelineAnnotation("Getting a lock");
+ }
+
+ boolean success = false;
+ try {
+ // Keep trying until we have a lock or error out.
+ // TODO: do we need to add a time component here?
+ while (result == null) {
+
+ // Try adding a RowLockContext to the lockedRows.
+ // If we can add it then there's no other transactions currently running.
+ rowLockContext = new RowLockContext(rowKey);
+ RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);
+
+ // if there was a running transaction then there's already a context.
+ if (existingContext != null) {
+ rowLockContext = existingContext;
+ }
+
+ result = rowLockContext.newRowLock();
+ }
+ if (!result.getLock().tryLock(waitDuration, TimeUnit.MILLISECONDS)) {
+ if (traceScope != null) {
+ traceScope.getSpan().addTimelineAnnotation("Failed to get row lock");
+ }
+ throw new TimeoutIOException("Timed out waiting for lock for row: " + rowKey);
+ }
+ rowLockContext.setThreadName(Thread.currentThread().getName());
+ success = true;
+ return result;
+ } catch (InterruptedException ie) {
+ LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
+ InterruptedIOException iie = new InterruptedIOException();
+ iie.initCause(ie);
+ if (traceScope != null) {
+ traceScope.getSpan().addTimelineAnnotation("Interrupted exception getting row lock");
+ }
+ Thread.currentThread().interrupt();
+ throw iie;
+ } finally {
+ // On failure, clean up the counts just in case this was the thing keeping the context alive.
+ if (!success && rowLockContext != null) rowLockContext.cleanUp();
+ if (traceScope != null) {
+ traceScope.close();
+ }
+ }
+ }
+
+ /**
+ * Unlock the row. We need this stateless way of unlocking because
+ * we have no means of passing the RowLock instances between
+ * coprocessor calls (see HBASE-18482). Once we have that, we
+ * can have the caller collect RowLock instances and free when
+ * needed.
+ * @param row the row key
+ * @throws IOException
+ */
+ public void unlockRow(byte[] row) throws IOException {
+ ImmutableBytesPtr rowKey = new ImmutableBytesPtr(row);
+ RowLockContext lockContext = lockedRows.get(rowKey);
+ if (lockContext != null) {
+ lockContext.releaseRowLock();
+ }
+ }
+
+ class RowLockContext {
+ private final ImmutableBytesPtr rowKey;
+ // TODO: consider making this non atomic. It's only saving one
+ // synchronization in the case of cleanup() when more than one
+ // thread is holding on to the lock.
+ private final AtomicInteger count = new AtomicInteger(0);
+ private final ReentrantLock reentrantLock = new ReentrantLock(true);
+ // TODO: remove once we can pass List<RowLock> as needed through
+ // coprocessor calls.
+ private volatile RowLockImpl rowLock = RowLockImpl.UNINITIALIZED;
+ private String threadName;
+
+ RowLockContext(ImmutableBytesPtr rowKey) {
+ this.rowKey = rowKey;
+ }
+
+ RowLockImpl newRowLock() {
+ count.incrementAndGet();
+ synchronized (this) {
+ if (rowLock != null) {
+ rowLock = new RowLockImpl(this, reentrantLock);
+ return rowLock;
+ } else {
+ return null;
+ }
+ }
+ }
+
+ void releaseRowLock() {
+ synchronized (this) {
+ if (rowLock != null) {
+ rowLock.release();
+ }
+ }
+ }
+
+ void cleanUp() {
+ long c = count.decrementAndGet();
+ if (c <= 0) {
+ synchronized (this) {
+ if (count.get() <= 0 && rowLock != null){
+ rowLock = null;
+ RowLockContext removed = lockedRows.remove(rowKey);
+ assert removed == this: "we should never remove a different context";
+ }
+ }
+ }
+ }
+
+ void setThreadName(String threadName) {
+ this.threadName = threadName;
+ }
+
+ @Override
+ public String toString() {
+ return "RowLockContext{" +
+ "row=" + rowKey +
+ ", readWriteLock=" + reentrantLock +
+ ", count=" + count +
+ ", threadName=" + threadName +
+ '}';
+ }
+ }
+
+ /**
+ * Class used to represent a lock on a row.
+ */
+ public static class RowLockImpl implements RowLock {
+ static final RowLockImpl UNINITIALIZED = new RowLockImpl();
+ private final RowLockContext context;
+ private final Lock lock;
+
+ private RowLockImpl() {
+ context = null;
+ lock = null;
+ }
+
+ RowLockImpl(RowLockContext context, Lock lock) {
+ this.context = context;
+ this.lock = lock;
+ }
+
+ Lock getLock() {
+ return lock;
+ }
+
+ @Override
+ public void release() {
+ lock.unlock();
+ context.cleanUp();
+ }
+
+ @Override
+ public String toString() {
+ return "RowLockImpl{" +
+ "context=" + context +
+ ", lock=" + lock +
+ '}';
+ }
+ }
+
+ /**
+ * Row lock held by a given thread.
+ * One thread may acquire multiple locks on the same row simultaneously.
+ * The locks must be released by calling release() from the same thread.
+ */
+ public interface RowLock {
+ /**
+ * Release the given lock. If there are no remaining locks held by the current thread
+ * then unlock the row and allow other threads to acquire the lock.
+ * @throws IllegalArgumentException if called by a different thread than the lock owning
+ * thread
+ */
+ void release();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/54d9e1c3/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index c015a77..0567d35 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -21,8 +21,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,12 +34,6 @@ import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.hbase.index.Indexer;
import org.apache.phoenix.hbase.index.covered.IndexMetaData;
-import org.apache.phoenix.hbase.index.parallel.QuickFailingTaskRunner;
-import org.apache.phoenix.hbase.index.parallel.Task;
-import org.apache.phoenix.hbase.index.parallel.TaskBatch;
-import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
-
-import com.google.common.util.concurrent.MoreExecutors;
/**
* Manage the building of index updates from primary table updates.
@@ -90,7 +82,8 @@ public class IndexBuildManager implements Stoppable {
// Avoid the Object overhead of the executor when it's not actually parallelizing anything.
ArrayList<Pair<Mutation, byte[]>> results = new ArrayList<>(mutations.size());
for (Mutation m : mutations) {
- results.addAll(delegate.getIndexUpdate(m, indexMetaData));
+ Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData);
+ results.addAll(updates);
}
return results;
}
@@ -139,5 +132,4 @@ public class IndexBuildManager implements Stoppable {
public IndexBuilder getBuilderForTesting() {
return this.delegate;
}
-
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/54d9e1c3/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 012c663..8a5a8e4 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -838,6 +838,8 @@ public class TestUtil {
public static void dumpTable(HTableInterface table) throws IOException {
System.out.println("************ dumping " + table + " **************");
Scan s = new Scan();
+ s.setRaw(true);;
+ s.setMaxVersions();
try (ResultScanner scanner = table.getScanner(s)) {
Result result = null;
while ((result = scanner.next()) != null) {