You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/09/26 20:14:46 UTC
[1/4] phoenix git commit: PHOENIX-4230 Write index updates in
postBatchMutateIndispensably for transactional tables
Repository: phoenix
Updated Branches:
refs/heads/4.x-HBase-1.2 fdfdb41dd -> 6f65a7935
PHOENIX-4230 Write index updates in postBatchMutateIndispensably for transactional tables
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d3ea3243
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d3ea3243
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d3ea3243
Branch: refs/heads/4.x-HBase-1.2
Commit: d3ea32438e9ccdd1ea66ec752e5c563ffa2ba17b
Parents: fdfdb41
Author: James Taylor <jt...@salesforce.com>
Authored: Mon Sep 25 18:52:48 2017 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Tue Sep 26 13:09:52 2017 -0700
----------------------------------------------------------------------
.../index/PhoenixTransactionalIndexer.java | 79 +++++++++++++++++---
1 file changed, 70 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d3ea3243/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 5444360..969378d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -17,6 +17,11 @@
*/
package org.apache.phoenix.index;
+import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_PAUSE;
+import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER;
+import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_PAUSE;
+import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_RETRIES_NUMBER;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -36,6 +41,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
@@ -99,6 +105,15 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
private static final Log LOG = LogFactory.getLog(PhoenixTransactionalIndexer.class);
+ // Hack to get around not being able to save any state between
+ // coprocessor calls. TODO: remove after HBASE-18127 when available
+ private static class BatchMutateContext {
+ public Collection<Pair<Mutation, byte[]>> indexUpdates = Collections.emptyList();
+ }
+
+ private ThreadLocal<BatchMutateContext> batchMutateContext =
+ new ThreadLocal<BatchMutateContext>();
+
private PhoenixIndexCodec codec;
private IndexWriter writer;
private boolean stopped;
@@ -117,9 +132,15 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
*/
clonedConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class);
+ // lower the number of rpc retries. We inherit config from HConnectionManager#setServerSideHConnectionRetries,
+ // which by default uses a multiplier of 10. That is too many retries for our synchronous index writes
+ clonedConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ env.getConfiguration().getInt(INDEX_WRITER_RPC_RETRIES_NUMBER,
+ DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER));
+ clonedConfig.setInt(HConstants.HBASE_CLIENT_PAUSE, env.getConfiguration()
+ .getInt(INDEX_WRITER_RPC_PAUSE, DEFAULT_INDEX_WRITER_RPC_PAUSE));
DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(clonedConfig, env);
// setup the actual index writer
- // setup the actual index writer
// For transactional tables, we keep the index active upon a write failure
// since we have the all versus none behavior for transactions.
this.writer = new IndexWriter(new LeaveIndexActiveFailurePolicy(), indexWriterEnv, serverName + "-tx-index-writer");
@@ -154,6 +175,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
};
}
+
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
@@ -164,6 +186,9 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
return;
}
+ BatchMutateContext context = new BatchMutateContext();
+ setBatchMutateContext(c, context);
+
Map<String,byte[]> updateAttributes = m.getAttributesMap();
PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(c.getEnvironment(),updateAttributes);
byte[] txRollbackAttribute = m.getAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY);
@@ -176,15 +201,10 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
}
// get the index updates for all elements in this batch
- indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp), txRollbackAttribute);
+ context.indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp), txRollbackAttribute);
current.addTimelineAnnotation("Built index updates, doing preStep");
- TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
-
- // no index updates, so we are done
- if (!indexUpdates.isEmpty()) {
- this.writer.write(indexUpdates, true);
- }
+ TracingUtils.addAnnotation(current, "index update count", context.indexUpdates.size());
} catch (Throwable t) {
String msg = "Failed to update index with entries:" + indexUpdates;
LOG.error(msg, t);
@@ -192,7 +212,48 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
}
}
- public static void addMutation(Map<ImmutableBytesPtr, MultiMutation> mutations, ImmutableBytesPtr row, Mutation m) {
+ @Override
+ public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c,
+ MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException {
+ BatchMutateContext context = getBatchMutateContext(c);
+ if (context == null || context.indexUpdates == null) {
+ return;
+ }
+ // get the current span, or just use a null-span to avoid a bunch of if statements
+ try (TraceScope scope = Trace.startSpan("Starting to write index updates")) {
+ Span current = scope.getSpan();
+ if (current == null) {
+ current = NullSpan.INSTANCE;
+ }
+
+ if (success) { // if miniBatchOp was successfully written, write index updates
+ if (!context.indexUpdates.isEmpty()) {
+ this.writer.write(context.indexUpdates, true);
+ }
+ current.addTimelineAnnotation("Wrote index updates");
+ }
+ } catch (Throwable t) {
+ String msg = "Failed to write index updates:" + context.indexUpdates;
+ LOG.error(msg, t);
+ ServerUtil.throwIOException(msg, t);
+ } finally {
+ removeBatchMutateContext(c);
+ }
+ }
+
+ private void setBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) {
+ this.batchMutateContext.set(context);
+ }
+
+ private BatchMutateContext getBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
+ return this.batchMutateContext.get();
+ }
+
+ private void removeBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c) {
+ this.batchMutateContext.remove();
+ }
+
+ private static void addMutation(Map<ImmutableBytesPtr, MultiMutation> mutations, ImmutableBytesPtr row, Mutation m) {
MultiMutation stored = mutations.get(row);
// we haven't seen this row before, so add it
if (stored == null) {
[3/4] phoenix git commit: PHOENIX-3815 Only disable indexes on which
write failures occurred (Vincent Poon)
Posted by ja...@apache.org.
PHOENIX-3815 Only disable indexes on which write failures occurred (Vincent Poon)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4354a2cb
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4354a2cb
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4354a2cb
Branch: refs/heads/4.x-HBase-1.2
Commit: 4354a2cb50505c0467ed4618c0a4a30bb5a9e02b
Parents: d570c82
Author: James Taylor <jt...@salesforce.com>
Authored: Mon Sep 25 22:37:01 2017 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Tue Sep 26 13:10:55 2017 -0700
----------------------------------------------------------------------
.../end2end/index/MutableIndexFailureIT.java | 36 ++-
.../phoenix/hbase/index/write/IndexWriter.java | 8 +-
.../hbase/index/write/RecoveryIndexWriter.java | 1 -
.../TrackingParallelWriterIndexCommitter.java | 245 +++++++++++++++++++
.../recovery/StoreFailuresInCachePolicy.java | 1 +
.../TrackingParallelWriterIndexCommitter.java | 238 ------------------
.../index/PhoenixIndexFailurePolicy.java | 9 +
.../index/PhoenixTransactionalIndexer.java | 7 +-
.../hbase/index/write/TestIndexWriter.java | 89 +------
.../index/write/TestParalleIndexWriter.java | 4 +-
.../write/TestParalleWriterIndexCommitter.java | 4 +-
11 files changed, 295 insertions(+), 347 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4354a2cb/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 74a0389..7daee4e 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -60,6 +60,7 @@ import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.util.IndexScrutiny;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
@@ -226,8 +227,8 @@ public class MutableIndexFailureIT extends BaseTest {
@Test
public void testIndexWriteFailure() throws Exception {
String secondIndexName = "B_" + FailingRegionObserver.FAIL_INDEX_NAME;
-// String thirdIndexName = "C_" + INDEX_NAME;
-// String thirdFullIndexName = SchemaUtil.getTableName(schema, thirdIndexName);
+ String thirdIndexName = "C_IDX";
+ String thirdFullIndexName = SchemaUtil.getTableName(schema, thirdIndexName);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
props.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, String.valueOf(isNamespaceMapped));
try (Connection conn = driver.connect(url, props)) {
@@ -250,8 +251,8 @@ public class MutableIndexFailureIT extends BaseTest {
// check the drop index.
conn.createStatement().execute(
"CREATE " + (!localIndex ? "LOCAL " : "") + " INDEX " + secondIndexName + " ON " + fullTableName + " (v2) INCLUDE (v1)");
-// conn.createStatement().execute(
-// "CREATE " + (localIndex ? "LOCAL " : "") + " INDEX " + thirdIndexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
+ conn.createStatement().execute(
+ "CREATE " + (localIndex ? "LOCAL " : "") + " INDEX " + thirdIndexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
query = "SELECT * FROM " + fullIndexName;
rs = conn.createStatement().executeQuery(query);
@@ -266,9 +267,9 @@ public class MutableIndexFailureIT extends BaseTest {
assertTrue(rs.next());
assertEquals(secondIndexName, rs.getString(3));
assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
-// assertTrue(rs.next());
-// assertEquals(thirdIndexName, rs.getString(3));
-// assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
+ assertTrue(rs.next());
+ assertEquals(thirdIndexName, rs.getString(3));
+ assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
initializeTable(conn, fullTableName);
query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
@@ -301,6 +302,10 @@ public class MutableIndexFailureIT extends BaseTest {
assertTrue(PIndexState.ACTIVE.toString().equals(indexState) || PIndexState.PENDING_ACTIVE.toString().equals(indexState));
} else {
assertTrue(PIndexState.DISABLE.toString().equals(indexState) || PIndexState.INACTIVE.toString().equals(indexState));
+ // non-failing index should remain active
+ ResultSet thirdRs = conn.createStatement().executeQuery(getSysCatQuery(thirdIndexName));
+ assertTrue(thirdRs.next());
+ assertEquals(PIndexState.ACTIVE.getSerializedValue(), thirdRs.getString(1));
}
assertFalse(rs.next());
@@ -330,8 +335,7 @@ public class MutableIndexFailureIT extends BaseTest {
assertEquals("d", rs.getString(2));
assertFalse(rs.next());
}
- // Comment back in when PHOENIX-3815 is fixed
-// validateDataWithIndex(conn, fullTableName, thirdFullIndexName, false);
+ IndexScrutiny.scrutinizeIndex(conn, fullTableName, thirdFullIndexName);
if (!failRebuildTask) {
// re-enable index table
@@ -357,10 +361,7 @@ public class MutableIndexFailureIT extends BaseTest {
checkStateAfterRebuild(conn, fullIndexName, PIndexState.DISABLE);
// verify that the index was marked as disabled and the index disable
// timestamp set to 0
- String q =
- "SELECT INDEX_STATE, INDEX_DISABLE_TIMESTAMP FROM SYSTEM.CATALOG WHERE TABLE_SCHEM = '"
- + schema + "' AND TABLE_NAME = '" + indexName + "'"
- + " AND COLUMN_NAME IS NULL AND COLUMN_FAMILY IS NULL";
+ String q = getSysCatQuery(indexName);
try (ResultSet r = conn.createStatement().executeQuery(q)) {
assertTrue(r.next());
assertEquals(PIndexState.DISABLE.getSerializedValue(), r.getString(1));
@@ -374,6 +375,15 @@ public class MutableIndexFailureIT extends BaseTest {
}
}
+ private String getSysCatQuery(String iName) {
+ String q =
+ "SELECT INDEX_STATE, INDEX_DISABLE_TIMESTAMP FROM SYSTEM.CATALOG WHERE TABLE_SCHEM = '"
+ + schema + "' AND TABLE_NAME = '" + iName + "'"
+ + " AND COLUMN_NAME IS NULL AND COLUMN_FAMILY IS NULL";
+ return q;
+ }
+
+
private void checkStateAfterRebuild(Connection conn, String fullIndexName, PIndexState expectedIndexState) throws InterruptedException, SQLException {
if (!transactional) {
assertTrue(TestUtil.checkIndexState(conn,fullIndexName, expectedIndexState, 0l));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4354a2cb/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
index a037e92..6b57025 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
@@ -47,7 +47,7 @@ import com.google.common.collect.Multimap;
public class IndexWriter implements Stoppable {
private static final Log LOG = LogFactory.getLog(IndexWriter.class);
- private static final String INDEX_COMMITTER_CONF_KEY = "index.writer.commiter.class";
+ public static final String INDEX_COMMITTER_CONF_KEY = "index.writer.commiter.class";
public static final String INDEX_FAILURE_POLICY_CONF_KEY = "index.writer.failurepolicy.class";
private AtomicBoolean stopped = new AtomicBoolean(false);
private IndexCommitter writer;
@@ -66,10 +66,14 @@ public class IndexWriter implements Stoppable {
}
public static IndexCommitter getCommitter(RegionCoprocessorEnvironment env) throws IOException {
+ return getCommitter(env,TrackingParallelWriterIndexCommitter.class);
+ }
+
+ public static IndexCommitter getCommitter(RegionCoprocessorEnvironment env, Class<? extends IndexCommitter> defaultClass) throws IOException {
Configuration conf = env.getConfiguration();
try {
IndexCommitter committer =
- conf.getClass(INDEX_COMMITTER_CONF_KEY, ParallelWriterIndexCommitter.class,
+ conf.getClass(INDEX_COMMITTER_CONF_KEY, defaultClass,
IndexCommitter.class).newInstance();
return committer;
} catch (InstantiationException e) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4354a2cb/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
index be542bb..e340784 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.hbase.index.write.recovery.TrackingParallelWriterIndexCommitter;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4354a2cb/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
new file mode 100644
index 0000000..0449e9e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.write;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.hbase.index.CapturingAbortable;
+import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException;
+import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
+import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
+import org.apache.phoenix.hbase.index.parallel.Task;
+import org.apache.phoenix.hbase.index.parallel.TaskBatch;
+import org.apache.phoenix.hbase.index.parallel.TaskRunner;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
+import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner;
+import org.apache.phoenix.hbase.index.table.HTableFactory;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.util.IndexUtil;
+
+import com.google.common.collect.Multimap;
+
+/**
+ * Like the {@link ParallelWriterIndexCommitter}, but blocks until all writes have attempted to allow the caller to
+ * retrieve the failed and succeeded index updates. Therefore, this class will be a lot slower, in the face of failures,
+ * when compared to the {@link ParallelWriterIndexCommitter} (though as fast for writes), so it should be used only when
+ * you need to at least attempt all writes and know their result; for instance, this is fine for doing WAL recovery -
+ * it's not a performance intensive situation and we want to limit the the edits we need to retry.
+ * <p>
+ * On failure to {@link #write(Multimap)}, we return a {@link MultiIndexWriteFailureException} that contains the list of
+ * {@link HTableInterfaceReference} that didn't complete successfully.
+ * <p>
+ * Failures to write to the index can happen several different ways:
+ * <ol>
+ * <li><tt>this</tt> is {@link #stop(String) stopped} or aborted (via the passed {@link Abortable}. This causing any
+ * pending tasks to fail whatever they are doing as fast as possible. Any writes that have not begun are not even
+ * attempted and marked as failures.</li>
+ * <li>A batch write fails. This is the generic HBase write failure - it may occur because the index table is not
+ * available, .META. or -ROOT- is unavailable, or any other (of many) possible HBase exceptions.</li>
+ * </ol>
+ * Regardless of how the write fails, we still wait for all writes to complete before passing the failure back to the
+ * client.
+ */
+public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
+ private static final Log LOG = LogFactory.getLog(TrackingParallelWriterIndexCommitter.class);
+
+ public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.writer.threads.max";
+ private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
+ private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY = "index.writer.threads.keepalivetime";
+
+ private TaskRunner pool;
+ private HTableFactory factory;
+ private CapturingAbortable abortable;
+ private Stoppable stopped;
+ private RegionCoprocessorEnvironment env;
+ private KeyValueBuilder kvBuilder;
+
+ // for testing
+ public TrackingParallelWriterIndexCommitter(String hbaseVersion) {
+ kvBuilder = KeyValueBuilder.get(hbaseVersion);
+ }
+
+ public TrackingParallelWriterIndexCommitter() {
+ }
+
+ @Override
+ public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
+ this.env = env;
+ Configuration conf = env.getConfiguration();
+ setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
+ ThreadPoolManager.getExecutor(
+ new ThreadPoolBuilder(name, conf).setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
+ DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout(
+ INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), env.getRegionServerServices(), parent, env);
+ this.kvBuilder = KeyValueBuilder.get(env.getHBaseVersion());
+ }
+
+ /**
+ * Setup <tt>this</tt>.
+ * <p>
+ * Exposed for TESTING
+ */
+ void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop,
+ RegionCoprocessorEnvironment env) {
+ this.pool = new WaitForCompletionTaskRunner(pool);
+ this.factory = factory;
+ this.abortable = new CapturingAbortable(abortable);
+ this.stopped = stop;
+ }
+
+ @Override
+ public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, final boolean allowLocalUpdates) throws MultiIndexWriteFailureException {
+ Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
+ TaskBatch<Boolean> tasks = new TaskBatch<Boolean>(entries.size());
+ List<HTableInterfaceReference> tables = new ArrayList<HTableInterfaceReference>(entries.size());
+ for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
+ // get the mutations for each table. We leak the implementation here a little bit to save
+ // doing a complete copy over of all the index update for each table.
+ final List<Mutation> mutations = kvBuilder.cloneIfNecessary((List<Mutation>)entry.getValue());
+ // track each reference so we can get at it easily later, when determing failures
+ final HTableInterfaceReference tableReference = entry.getKey();
+ final RegionCoprocessorEnvironment env = this.env;
+ if (env != null
+ && !allowLocalUpdates
+ && tableReference.getTableName().equals(
+ env.getRegion().getTableDesc().getNameAsString())) {
+ continue;
+ }
+ tables.add(tableReference);
+
+ /*
+ * Write a batch of index updates to an index table. This operation stops (is cancelable) via two
+ * mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the running thread.
+ * The former will only work if we are not in the midst of writing the current batch to the table, though we
+ * do check these status variables before starting and before writing the batch. The latter usage,
+ * interrupting the thread, will work in the previous situations as was at some points while writing the
+ * batch, depending on the underlying writer implementation (HTableInterface#batch is blocking, but doesn't
+ * elaborate when is supports an interrupt).
+ */
+ tasks.add(new Task<Boolean>() {
+
+ /**
+ * Do the actual write to the primary table.
+ */
+ @SuppressWarnings("deprecation")
+ @Override
+ public Boolean call() throws Exception {
+ HTableInterface table = null;
+ try {
+ // this may have been queued, but there was an abort/stop so we try to early exit
+ throwFailureIfDone();
+ if (allowLocalUpdates
+ && env != null
+ && tableReference.getTableName().equals(
+ env.getRegion().getTableDesc().getNameAsString())) {
+ try {
+ throwFailureIfDone();
+ IndexUtil.writeLocalUpdates(env.getRegion(), mutations, true);
+ return Boolean.TRUE;
+ } catch (IOException ignord) {
+ // when it's failed we fall back to the standard & slow way
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error="
+ + ignord);
+ }
+ }
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Writing index update:" + mutations + " to table: " + tableReference);
+ }
+
+ table = factory.getTable(tableReference.get());
+ throwFailureIfDone();
+ table.batch(mutations);
+ } catch (InterruptedException e) {
+ // reset the interrupt status on the thread
+ Thread.currentThread().interrupt();
+ throw e;
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ if (table != null) {
+ table.close();
+ }
+ }
+ return Boolean.TRUE;
+ }
+
+ private void throwFailureIfDone() throws SingleIndexWriteFailureException {
+ if (stopped.isStopped() || abortable.isAborted() || Thread.currentThread().isInterrupted()) { throw new SingleIndexWriteFailureException(
+ "Pool closed, not attempting to write to the index!", null); }
+
+ }
+ });
+ }
+
+ List<Boolean> results = null;
+ try {
+ LOG.debug("Waiting on index update tasks to complete...");
+ results = this.pool.submitUninterruptible(tasks);
+ } catch (ExecutionException e) {
+ throw new RuntimeException("Should not fail on the results while using a WaitForCompletionTaskRunner", e);
+ } catch (EarlyExitFailure e) {
+ throw new RuntimeException("Stopped while waiting for batch, quiting!", e);
+ }
+
+ // track the failures. We only ever access this on return from our calls, so no extra
+ // synchronization is needed. We could update all the failures as we find them, but that add a
+ // lot of locking overhead, and just doing the copy later is about as efficient.
+ List<HTableInterfaceReference> failures = new ArrayList<HTableInterfaceReference>();
+ int index = 0;
+ for (Boolean result : results) {
+ // there was a failure
+ if (result == null) {
+ // we know which table failed by the index of the result
+ failures.add(tables.get(index));
+ }
+ index++;
+ }
+
+ // if any of the tasks failed, then we need to propagate the failure
+ if (failures.size() > 0) {
+ // make the list unmodifiable to avoid any more synchronization concerns
+ throw new MultiIndexWriteFailureException(Collections.unmodifiableList(failures));
+ }
+ return;
+ }
+
+ @Override
+ public void stop(String why) {
+ LOG.info("Shutting down " + this.getClass().getSimpleName());
+ this.pool.stop(why);
+ this.factory.shutdown();
+ }
+
+ @Override
+ public boolean isStopped() {
+ return this.stopped.isStopped();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4354a2cb/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/StoreFailuresInCachePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/StoreFailuresInCachePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/StoreFailuresInCachePolicy.java
index 189f970..e28a0bd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/StoreFailuresInCachePolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/StoreFailuresInCachePolicy.java
@@ -30,6 +30,7 @@ import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.write.IndexFailurePolicy;
import org.apache.phoenix.hbase.index.write.KillServerOnFailurePolicy;
+import org.apache.phoenix.hbase.index.write.TrackingParallelWriterIndexCommitter;
/**
* Tracks any failed writes in The {@link PerRegionIndexWriteCache}, given a
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4354a2cb/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
deleted file mode 100644
index d2436ba..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
- * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
- * governing permissions and limitations under the License.
- */
-package org.apache.phoenix.hbase.index.write.recovery;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.phoenix.hbase.index.CapturingAbortable;
-import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException;
-import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
-import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
-import org.apache.phoenix.hbase.index.parallel.Task;
-import org.apache.phoenix.hbase.index.parallel.TaskBatch;
-import org.apache.phoenix.hbase.index.parallel.TaskRunner;
-import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
-import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
-import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner;
-import org.apache.phoenix.hbase.index.table.HTableFactory;
-import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
-import org.apache.phoenix.hbase.index.write.IndexCommitter;
-import org.apache.phoenix.hbase.index.write.IndexWriter;
-import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
-import org.apache.phoenix.hbase.index.write.ParallelWriterIndexCommitter;
-import org.apache.phoenix.util.IndexUtil;
-
-import com.google.common.collect.Multimap;
-
-/**
- * Like the {@link ParallelWriterIndexCommitter}, but blocks until all writes have attempted to allow the caller to
- * retrieve the failed and succeeded index updates. Therefore, this class will be a lot slower, in the face of failures,
- * when compared to the {@link ParallelWriterIndexCommitter} (though as fast for writes), so it should be used only when
- * you need to at least attempt all writes and know their result; for instance, this is fine for doing WAL recovery -
- * it's not a performance intensive situation and we want to limit the the edits we need to retry.
- * <p>
- * On failure to {@link #write(Multimap)}, we return a {@link MultiIndexWriteFailureException} that contains the list of
- * {@link HTableInterfaceReference} that didn't complete successfully.
- * <p>
- * Failures to write to the index can happen several different ways:
- * <ol>
- * <li><tt>this</tt> is {@link #stop(String) stopped} or aborted (via the passed {@link Abortable}. This causing any
- * pending tasks to fail whatever they are doing as fast as possible. Any writes that have not begun are not even
- * attempted and marked as failures.</li>
- * <li>A batch write fails. This is the generic HBase write failure - it may occur because the index table is not
- * available, .META. or -ROOT- is unavailable, or any other (of many) possible HBase exceptions.</li>
- * </ol>
- * Regardless of how the write fails, we still wait for all writes to complete before passing the failure back to the
- * client.
- */
-public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
- private static final Log LOG = LogFactory.getLog(TrackingParallelWriterIndexCommitter.class);
-
- public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.trackingwriter.threads.max";
- private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
- private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY = "index.trackingwriter.threads.keepalivetime";
-
- private TaskRunner pool;
- private HTableFactory factory;
- private CapturingAbortable abortable;
- private Stoppable stopped;
- private RegionCoprocessorEnvironment env;
-
- @Override
- public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
- this.env = env;
- Configuration conf = env.getConfiguration();
- setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
- ThreadPoolManager.getExecutor(
- new ThreadPoolBuilder(name, conf).setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
- DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout(
- INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), env.getRegionServerServices(), parent, env);
- }
-
- /**
- * Setup <tt>this</tt>.
- * <p>
- * Exposed for TESTING
- */
- void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop,
- RegionCoprocessorEnvironment env) {
- this.pool = new WaitForCompletionTaskRunner(pool);
- this.factory = factory;
- this.abortable = new CapturingAbortable(abortable);
- this.stopped = stop;
- }
-
- @Override
- public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, final boolean allowLocalUpdates) throws MultiIndexWriteFailureException {
- Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
- TaskBatch<Boolean> tasks = new TaskBatch<Boolean>(entries.size());
- List<HTableInterfaceReference> tables = new ArrayList<HTableInterfaceReference>(entries.size());
- for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
- // get the mutations for each table. We leak the implementation here a little bit to save
- // doing a complete copy over of all the index update for each table.
- final List<Mutation> mutations = (List<Mutation>)entry.getValue();
- // track each reference so we can get at it easily later, when determing failures
- final HTableInterfaceReference tableReference = entry.getKey();
- final RegionCoprocessorEnvironment env = this.env;
- if (env != null
- && !allowLocalUpdates
- && tableReference.getTableName().equals(
- env.getRegion().getTableDesc().getNameAsString())) {
- continue;
- }
- tables.add(tableReference);
-
- /*
- * Write a batch of index updates to an index table. This operation stops (is cancelable) via two
- * mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the running thread.
- * The former will only work if we are not in the midst of writing the current batch to the table, though we
- * do check these status variables before starting and before writing the batch. The latter usage,
- * interrupting the thread, will work in the previous situations as was at some points while writing the
- * batch, depending on the underlying writer implementation (HTableInterface#batch is blocking, but doesn't
- * elaborate when is supports an interrupt).
- */
- tasks.add(new Task<Boolean>() {
-
- /**
- * Do the actual write to the primary table.
- */
- @SuppressWarnings("deprecation")
- @Override
- public Boolean call() throws Exception {
- HTableInterface table = null;
- try {
- // this may have been queued, but there was an abort/stop so we try to early exit
- throwFailureIfDone();
- if (allowLocalUpdates
- && env != null
- && tableReference.getTableName().equals(
- env.getRegion().getTableDesc().getNameAsString())) {
- try {
- throwFailureIfDone();
- IndexUtil.writeLocalUpdates(env.getRegion(), mutations, true);
- return Boolean.TRUE;
- } catch (IOException ignord) {
- // when it's failed we fall back to the standard & slow way
- if (LOG.isTraceEnabled()) {
- LOG.trace("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error="
- + ignord);
- }
- }
- }
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("Writing index update:" + mutations + " to table: " + tableReference);
- }
-
- table = factory.getTable(tableReference.get());
- throwFailureIfDone();
- table.batch(mutations);
- } catch (InterruptedException e) {
- // reset the interrupt status on the thread
- Thread.currentThread().interrupt();
- throw e;
- } catch (Exception e) {
- throw e;
- } finally {
- if (table != null) {
- table.close();
- }
- }
- return Boolean.TRUE;
- }
-
- private void throwFailureIfDone() throws SingleIndexWriteFailureException {
- if (stopped.isStopped() || abortable.isAborted() || Thread.currentThread().isInterrupted()) { throw new SingleIndexWriteFailureException(
- "Pool closed, not attempting to write to the index!", null); }
-
- }
- });
- }
-
- List<Boolean> results = null;
- try {
- LOG.debug("Waiting on index update tasks to complete...");
- results = this.pool.submitUninterruptible(tasks);
- } catch (ExecutionException e) {
- throw new RuntimeException("Should not fail on the results while using a WaitForCompletionTaskRunner", e);
- } catch (EarlyExitFailure e) {
- throw new RuntimeException("Stopped while waiting for batch, quiting!", e);
- }
-
- // track the failures. We only ever access this on return from our calls, so no extra
- // synchronization is needed. We could update all the failures as we find them, but that add a
- // lot of locking overhead, and just doing the copy later is about as efficient.
- List<HTableInterfaceReference> failures = new ArrayList<HTableInterfaceReference>();
- int index = 0;
- for (Boolean result : results) {
- // there was a failure
- if (result == null) {
- // we know which table failed by the index of the result
- failures.add(tables.get(index));
- }
- index++;
- }
-
- // if any of the tasks failed, then we need to propagate the failure
- if (failures.size() > 0) {
- // make the list unmodifiable to avoid any more synchronization concerns
- throw new MultiIndexWriteFailureException(Collections.unmodifiableList(failures));
- }
- return;
- }
-
- @Override
- public void stop(String why) {
- LOG.info("Shutting down " + this.getClass().getSimpleName());
- this.pool.stop(why);
- this.factory.shutdown();
- }
-
- @Override
- public boolean isStopped() {
- return this.stopped.isStopped();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4354a2cb/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index ee3b380..0fc138f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
+import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.write.DelegateIndexFailurePolicy;
import org.apache.phoenix.hbase.index.write.KillServerOnFailurePolicy;
@@ -166,7 +167,15 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy {
// start by looking at all the tables to which we attempted to write
long timestamp = 0;
boolean leaveIndexActive = blockDataTableWritesOnFailure || !disableIndexOnFailure;
+ // if using TrackingParallelWriter, we know which indexes failed and only disable those
+ Set<HTableInterfaceReference> failedTables = cause instanceof MultiIndexWriteFailureException
+ ? new HashSet<HTableInterfaceReference>(((MultiIndexWriteFailureException)cause).getFailedTables())
+ : Collections.<HTableInterfaceReference>emptySet();
+
for (HTableInterfaceReference ref : refs) {
+ if (failedTables.size() > 0 && !failedTables.contains(ref)) {
+ continue; // leave index active if its writes succeeded
+ }
long minTimeStamp = 0;
// get the minimum timestamp across all the mutations we attempted on that table
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4354a2cb/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 969378d..bc53b6b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -76,6 +76,7 @@ import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.write.IndexWriter;
import org.apache.phoenix.hbase.index.write.LeaveIndexActiveFailurePolicy;
+import org.apache.phoenix.hbase.index.write.ParallelWriterIndexCommitter;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.trace.TracingUtils;
@@ -142,8 +143,10 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(clonedConfig, env);
// setup the actual index writer
// For transactional tables, we keep the index active upon a write failure
- // since we have the all versus none behavior for transactions.
- this.writer = new IndexWriter(new LeaveIndexActiveFailurePolicy(), indexWriterEnv, serverName + "-tx-index-writer");
+ // since we have the all versus none behavior for transactions. Also, we
+ // fail on any write exception since this will end up failing the transaction.
+ this.writer = new IndexWriter(IndexWriter.getCommitter(indexWriterEnv, ParallelWriterIndexCommitter.class),
+ new LeaveIndexActiveFailurePolicy(), indexWriterEnv, serverName + "-tx-index-writer");
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4354a2cb/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
index 012f08e..b0e3780 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
@@ -131,7 +131,7 @@ public class TestIndexWriter {
tables.put(new ImmutableBytesPtr(tableName), table);
// setup the writer and failure policy
- ParallelWriterIndexCommitter committer = new ParallelWriterIndexCommitter(VersionInfo.getVersion());
+ TrackingParallelWriterIndexCommitter committer = new TrackingParallelWriterIndexCommitter(VersionInfo.getVersion());
committer.setup(factory, exec, abort, stop, e);
KillServerOnFailurePolicy policy = new KillServerOnFailurePolicy();
policy.setup(stop, abort);
@@ -145,91 +145,6 @@ public class TestIndexWriter {
}
/**
- * Index updates can potentially be queued up if there aren't enough writer threads. If a running
- * index write fails, then we should early exit the pending indexupdate, when it comes up (if the
- * pool isn't already shutdown).
- * <p>
- * This test is a little bit racey - we could actually have the failure of the first task before
- * the third task is even submitted. However, we should never see the third task attempt to make
- * the batch write, so we should never see a failure here.
- * @throws Exception on failure
- */
- @SuppressWarnings({ "unchecked", "deprecation" })
- @Test
- public void testFailureOnRunningUpdateAbortsPending() throws Exception {
- Abortable abort = new StubAbortable();
- Stoppable stop = Mockito.mock(Stoppable.class);
- // single thread factory so the older request gets queued
- ExecutorService exec = Executors.newFixedThreadPool(3);
- Map<ImmutableBytesPtr, HTableInterface> tables = new HashMap<ImmutableBytesPtr, HTableInterface>();
- FakeTableFactory factory = new FakeTableFactory(tables);
- // updates to two different tables
- byte[] tableName = Bytes.add(this.testName.getTableName(), new byte[] { 1, 2, 3, 4 });
- byte[] tableName2 = this.testName.getTableName();// this will sort after the first tablename
- // first table will fail
- HTableInterface table = Mockito.mock(HTableInterface.class);
- Mockito.when(table.batch(Mockito.anyList())).thenThrow(
- new IOException("Intentional IOException for failed first write."));
- Mockito.when(table.getTableName()).thenReturn(tableName);
- RegionCoprocessorEnvironment e =Mockito.mock(RegionCoprocessorEnvironment.class);
- Configuration conf =new Configuration();
- Mockito.when(e.getConfiguration()).thenReturn(conf);
- Mockito.when(e.getSharedData()).thenReturn(new ConcurrentHashMap<String,Object>());
- // second table just blocks to make sure that the abort propagates to the third task
- final CountDownLatch waitOnAbortedLatch = new CountDownLatch(1);
- final boolean[] failed = new boolean[] { false };
- HTableInterface table2 = Mockito.mock(HTableInterface.class);
- Mockito.when(table2.getTableName()).thenReturn(tableName2);
- Mockito.when(table2.batch(Mockito.anyList())).thenAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- waitOnAbortedLatch.await();
- return null;
- }
- }).thenAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- failed[0] = true;
- throw new RuntimeException(
- "Unexpected exception - second index table shouldn't have been written to");
- }
- });
-
- // add the tables to the set of tables, so its returned to the writer
- tables.put(new ImmutableBytesPtr(tableName), table);
- tables.put(new ImmutableBytesPtr(tableName2), table2);
-
- ParallelWriterIndexCommitter committer = new ParallelWriterIndexCommitter(VersionInfo.getVersion());
- committer.setup(factory, exec, abort, stop, e);
- KillServerOnFailurePolicy policy = new KillServerOnFailurePolicy();
- policy.setup(stop, abort);
- IndexWriter writer = new IndexWriter(committer, policy);
- try {
- Put m = new Put(row);
- m.add(Bytes.toBytes("family"), Bytes.toBytes("qual"), null);
- HTableInterfaceReference ht1 = new HTableInterfaceReference(new ImmutableBytesPtr(tableName));
- HTableInterfaceReference ht2 = new HTableInterfaceReference(new ImmutableBytesPtr(tableName2));
- // We need to apply updates first for table1 and then table2.
- Multimap<HTableInterfaceReference, Mutation> indexUpdates = LinkedListMultimap.create();
- indexUpdates.put(ht1, m);
- indexUpdates.put(ht2, m);
- indexUpdates.put(ht2, m);
- writer.write(indexUpdates, false);
- fail("Should not have successfully completed all index writes");
- } catch (SingleIndexWriteFailureException s) {
- LOG.info("Correctly got a failure to reach the index", s);
- // should have correctly gotten the correct abort, so let the next task execute
- waitOnAbortedLatch.countDown();
- }
- assertFalse(
- "Third set of index writes never have been attempted - should have seen the abort before done!",
- failed[0]);
- writer.stop(this.testName.getTableNameString() + " finished");
- assertTrue("Factory didn't get shutdown after writer#stop!", factory.shutdown);
- assertTrue("ExectorService isn't terminated after writer#stop!", exec.isShutdown());
- }
-
- /**
* Test that if we get an interruption to to the thread while doing a batch (e.g. via shutdown),
* that we correctly end the task
* @throws Exception on failure
@@ -279,7 +194,7 @@ public class TestIndexWriter {
indexUpdates.add(new Pair<Mutation, byte[]>(m, tableName));
// setup the writer
- ParallelWriterIndexCommitter committer = new ParallelWriterIndexCommitter(VersionInfo.getVersion());
+ TrackingParallelWriterIndexCommitter committer = new TrackingParallelWriterIndexCommitter(VersionInfo.getVersion());
committer.setup(factory, exec, abort, stop, e );
KillServerOnFailurePolicy policy = new KillServerOnFailurePolicy();
policy.setup(stop, abort);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4354a2cb/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java
index e62af7a..3e2b47c 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java
@@ -66,7 +66,7 @@ public class TestParalleIndexWriter {
Mockito.when(e.getSharedData()).thenReturn(new ConcurrentHashMap<String,Object>());
FakeTableFactory factory = new FakeTableFactory(
Collections.<ImmutableBytesPtr, HTableInterface> emptyMap());
- ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter(VersionInfo.getVersion());
+ TrackingParallelWriterIndexCommitter writer = new TrackingParallelWriterIndexCommitter(VersionInfo.getVersion());
Abortable mockAbort = Mockito.mock(Abortable.class);
Stoppable mockStop = Mockito.mock(Stoppable.class);
// create a simple writer
@@ -116,7 +116,7 @@ public class TestParalleIndexWriter {
tables.put(tableName, table);
// setup the writer and failure policy
- ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter(VersionInfo.getVersion());
+ TrackingParallelWriterIndexCommitter writer = new TrackingParallelWriterIndexCommitter(VersionInfo.getVersion());
writer.setup(factory, exec, abort, stop, e);
writer.write(indexUpdates, true);
assertTrue("Writer returned before the table batch completed! Likely a race condition tripped",
http://git-wip-us.apache.org/repos/asf/phoenix/blob/4354a2cb/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java
index 789e7a1..32a6661 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java
@@ -62,7 +62,7 @@ public class TestParalleWriterIndexCommitter {
ExecutorService exec = Executors.newFixedThreadPool(1);
FakeTableFactory factory = new FakeTableFactory(
Collections.<ImmutableBytesPtr, HTableInterface> emptyMap());
- ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter(VersionInfo.getVersion());
+ TrackingParallelWriterIndexCommitter writer = new TrackingParallelWriterIndexCommitter(VersionInfo.getVersion());
Abortable mockAbort = Mockito.mock(Abortable.class);
Stoppable mockStop = Mockito.mock(Stoppable.class);
RegionCoprocessorEnvironment e =Mockito.mock(RegionCoprocessorEnvironment.class);
@@ -117,7 +117,7 @@ public class TestParalleWriterIndexCommitter {
tables.put(tableName, table);
// setup the writer and failure policy
- ParallelWriterIndexCommitter writer = new ParallelWriterIndexCommitter(VersionInfo.getVersion());
+ TrackingParallelWriterIndexCommitter writer = new TrackingParallelWriterIndexCommitter(VersionInfo.getVersion());
writer.setup(factory, exec, abort, stop, e);
writer.write(indexUpdates, true);
assertTrue("Writer returned before the table batch completed! Likely a race condition tripped",
[2/4] phoenix git commit: PHOENIX-4233 IndexScrutiny test tool does
not work for salted and shared index tables
Posted by ja...@apache.org.
PHOENIX-4233 IndexScrutiny test tool does not work for salted and shared index tables
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d570c824
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d570c824
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d570c824
Branch: refs/heads/4.x-HBase-1.2
Commit: d570c824c6a2ca46c3b5f47e75097c064914a039
Parents: d3ea324
Author: James Taylor <jt...@salesforce.com>
Authored: Mon Sep 25 22:33:28 2017 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Tue Sep 26 13:10:40 2017 -0700
----------------------------------------------------------------------
.../apache/phoenix/util/IndexScrutinyIT.java | 4 +-
.../org/apache/phoenix/util/IndexScrutiny.java | 47 ++++++++++++++------
2 files changed, 36 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d570c824/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
index a5ec83f..3277e32 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
@@ -35,7 +35,7 @@ public class IndexScrutinyIT extends ParallelStatsDisabledIT {
String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
try (Connection conn = DriverManager.getConnection(getUrl())) {
- conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
+ conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true, SALT_BUCKETS=2");
conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v)");
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb')");
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')");
@@ -61,7 +61,7 @@ public class IndexScrutinyIT extends ParallelStatsDisabledIT {
String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
- conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v) INCLUDE (v2)");
+ conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + fullTableName + " (v) INCLUDE (v2)");
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb','0')");
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','1')");
conn.commit();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d570c824/phoenix-core/src/test/java/org/apache/phoenix/util/IndexScrutiny.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/IndexScrutiny.java b/phoenix-core/src/test/java/org/apache/phoenix/util/IndexScrutiny.java
index c78658d..380e718 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/IndexScrutiny.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/IndexScrutiny.java
@@ -25,6 +25,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
+import java.util.List;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.schema.PColumn;
@@ -39,20 +40,39 @@ public class IndexScrutiny {
public static long scrutinizeIndex(Connection conn, String fullTableName, String fullIndexName) throws SQLException {
PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
PTable ptable = pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName));
+ int tableColumnOffset = 0;
+ List<PColumn> tableColumns = ptable.getColumns();
+ List<PColumn> tablePKColumns = ptable.getPKColumns();
+ if (ptable.getBucketNum() != null) {
+ tableColumnOffset = 1;
+ tableColumns = tableColumns.subList(tableColumnOffset, tableColumns.size());
+ tablePKColumns = tablePKColumns.subList(tableColumnOffset, tablePKColumns.size());
+ }
PTable pindex = pconn.getTable(new PTableKey(pconn.getTenantId(), fullIndexName));
+ List<PColumn> indexColumns = pindex.getColumns();
+ int indexColumnOffset = 0;
+ if (pindex.getBucketNum() != null) {
+ indexColumnOffset = 1;
+ }
+ if (pindex.getViewIndexId() != null) {
+ indexColumnOffset++;
+ }
+ if (indexColumnOffset > 0) {
+ indexColumns = indexColumns.subList(indexColumnOffset, indexColumns.size());
+ }
StringBuilder indexQueryBuf = new StringBuilder("SELECT ");
- for (PColumn dcol : ptable.getPKColumns()) {
+ for (PColumn dcol : tablePKColumns) {
indexQueryBuf.append("CAST(\"" + IndexUtil.getIndexColumnName(dcol) + "\" AS " + dcol.getDataType().getSqlTypeName() + ")");
indexQueryBuf.append(",");
}
- for (PColumn icol : pindex.getColumns()) {
+ for (PColumn icol :indexColumns) {
PColumn dcol = IndexUtil.getDataColumn(ptable, icol.getName().getString());
if (SchemaUtil.isPKColumn(icol) && !SchemaUtil.isPKColumn(dcol)) {
indexQueryBuf.append("CAST (\"" + icol.getName().getString() + "\" AS " + dcol.getDataType().getSqlTypeName() + ")");
indexQueryBuf.append(",");
}
}
- for (PColumn icol : pindex.getColumns()) {
+ for (PColumn icol : indexColumns) {
if (!SchemaUtil.isPKColumn(icol)) {
PColumn dcol = IndexUtil.getDataColumn(ptable, icol.getName().getString());
indexQueryBuf.append("CAST (\"" + icol.getName().getString() + "\" AS " + dcol.getDataType().getSqlTypeName() + ")");
@@ -63,11 +83,11 @@ public class IndexScrutiny {
indexQueryBuf.append("\nFROM " + fullIndexName);
StringBuilder tableQueryBuf = new StringBuilder("SELECT ");
- for (PColumn dcol : ptable.getPKColumns()) {
+ for (PColumn dcol : tablePKColumns) {
tableQueryBuf.append("\"" + dcol.getName().getString() + "\"");
tableQueryBuf.append(",");
}
- for (PColumn icol : pindex.getColumns()) {
+ for (PColumn icol : indexColumns) {
PColumn dcol = IndexUtil.getDataColumn(ptable, icol.getName().getString());
if (SchemaUtil.isPKColumn(icol) && !SchemaUtil.isPKColumn(dcol)) {
if (dcol.getFamilyName() != null) {
@@ -78,7 +98,7 @@ public class IndexScrutiny {
tableQueryBuf.append(",");
}
}
- for (PColumn icol : pindex.getColumns()) {
+ for (PColumn icol : indexColumns) {
if (!SchemaUtil.isPKColumn(icol)) {
PColumn dcol = IndexUtil.getDataColumn(ptable, icol.getName().getString());
if (dcol.getFamilyName() != null) {
@@ -91,13 +111,13 @@ public class IndexScrutiny {
}
tableQueryBuf.setLength(tableQueryBuf.length()-1);
tableQueryBuf.append("\nFROM " + fullTableName + "\nWHERE (");
- for (PColumn dcol : ptable.getPKColumns()) {
+ for (PColumn dcol : tablePKColumns) {
tableQueryBuf.append("\"" + dcol.getName().getString() + "\"");
tableQueryBuf.append(",");
}
tableQueryBuf.setLength(tableQueryBuf.length()-1);
tableQueryBuf.append(") = ((");
- for (int i = 0; i < ptable.getPKColumns().size(); i++) {
+ for (int i = 0; i < tablePKColumns.size(); i++) {
tableQueryBuf.append("?");
tableQueryBuf.append(",");
}
@@ -114,11 +134,12 @@ public class IndexScrutiny {
while (irs.next()) {
icount++;
StringBuilder pkBuf = new StringBuilder("(");
- for (int i = 0; i < ptable.getPKColumns().size(); i++) {
- PColumn dcol = ptable.getPKColumns().get(i);
- Object pkVal = irs.getObject(i+1);
- PDataType pkType = PDataType.fromTypeId(irsmd.getColumnType(i + 1));
- istmt.setObject(i+1, pkVal, dcol.getDataType().getSqlType());
+ for (int i = 0; i < tablePKColumns.size(); i++) {
+ PColumn dcol = tablePKColumns.get(i);
+ int offset = i+1;
+ Object pkVal = irs.getObject(offset);
+ PDataType pkType = PDataType.fromTypeId(irsmd.getColumnType(offset));
+ istmt.setObject(offset, pkVal, dcol.getDataType().getSqlType());
pkBuf.append(pkType.toStringLiteral(pkVal));
pkBuf.append(",");
}
[4/4] phoenix git commit: PHOENIX-4214 Scans which write should not
block region split or close (Vincent Poon)
Posted by ja...@apache.org.
PHOENIX-4214 Scans which write should not block region split or close (Vincent Poon)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6f65a793
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6f65a793
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6f65a793
Branch: refs/heads/4.x-HBase-1.2
Commit: 6f65a7935b640969e570b870de9fa59e2a5bca67
Parents: 4354a2c
Author: James Taylor <jt...@salesforce.com>
Authored: Tue Sep 26 09:31:44 2017 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Tue Sep 26 13:11:12 2017 -0700
----------------------------------------------------------------------
.../UpsertSelectOverlappingBatchesIT.java | 239 +++++++++++++++----
.../UngroupedAggregateRegionObserver.java | 23 +-
2 files changed, 209 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6f65a793/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
index 53346b9..fbf3231 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/UpsertSelectOverlappingBatchesIT.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.execute;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.sql.Connection;
@@ -32,25 +33,43 @@ import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
public class UpsertSelectOverlappingBatchesIT extends BaseUniqueNamesOwnClusterIT {
-
+ private static final Logger logger = LoggerFactory.getLogger(UpsertSelectOverlappingBatchesIT.class);
+ private Properties props;
+ private static volatile String dataTable;
+ private String index;
+
@BeforeClass
public static void doSetup() throws Exception {
Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(3);
@@ -60,7 +79,12 @@ public class UpsertSelectOverlappingBatchesIT extends BaseUniqueNamesOwnClusterI
Map<String,String> clientProps = Maps.newHashMapWithExpectedSize(1);
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
}
-
+
+ @AfterClass
+ public static void tearDownClass() throws Exception {
+ getUtility().shutdownMiniCluster();
+ }
+
private class UpsertSelectRunner implements Callable<Boolean> {
private final String dataTable;
private final int minIndex;
@@ -89,58 +113,185 @@ public class UpsertSelectOverlappingBatchesIT extends BaseUniqueNamesOwnClusterI
return true;
}
}
-
}
-
+
+ private static class UpsertSelectLooper implements Runnable {
+ private UpsertSelectRunner runner;
+ public UpsertSelectLooper(UpsertSelectRunner runner) {
+ this.runner = runner;
+ }
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ runner.call();
+ }
+ catch (Exception e) {
+ if (ExceptionUtils.indexOfThrowable(e, InterruptedException.class) != -1) {
+ logger.info("Interrupted, exiting", e);
+ Thread.currentThread().interrupt();
+ return;
+ }
+ logger.error("Hit exception while writing", e);
+ }
+ }
+ }};
+
+ @Before
+ public void setup() throws Exception {
+ SlowBatchRegionObserver.SLOW_MUTATE = false;
+ props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ dataTable = generateUniqueName();
+ index = "IDX_" + dataTable;
+ try (Connection conn = driver.connect(url, props)) {
+ conn.createStatement().execute("CREATE TABLE " + dataTable
+ + " (k INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+ // create the index and ensure its empty as well
+ conn.createStatement().execute("CREATE INDEX " + index + " ON " + dataTable + " (v1)");
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + dataTable + " VALUES(?,?,?)");
+ conn.setAutoCommit(false);
+ for (int i = 0; i < 100; i++) {
+ stmt.setInt(1, i);
+ stmt.setString(2, "v1" + i);
+ stmt.setString(3, "v2" + i);
+ stmt.execute();
+ }
+ conn.commit();
+ }
+ }
+
@Test
public void testUpsertSelectSameBatchConcurrently() throws Exception {
- final String dataTable = generateUniqueName();
- final String index = "IDX_" + dataTable;
- // create the table and ensure its empty
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = driver.connect(url, props);
- conn.createStatement()
- .execute("CREATE TABLE " + dataTable + " (k INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
- // create the index and ensure its empty as well
- conn.createStatement().execute("CREATE INDEX " + index + " ON " + dataTable + " (v1)");
-
- conn = DriverManager.getConnection(getUrl(), props);
- PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + dataTable + " VALUES(?,?,?)");
- conn.setAutoCommit(false);
- for (int i = 0; i < 100; i++) {
- stmt.setInt(1, i);
- stmt.setString(2, "v1" + i);
- stmt.setString(3, "v2" + i);
- stmt.execute();
- }
- conn.commit();
-
- int numUpsertSelectRunners = 5;
- ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners);
- CompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(exec);
- List<Future<Boolean>> futures = Lists.newArrayListWithExpectedSize(numUpsertSelectRunners);
- // run one UPSERT SELECT for 100 rows (that locks the rows for a long time)
- futures.add(completionService.submit(new UpsertSelectRunner(dataTable, 0, 105, 1)));
- // run four UPSERT SELECTS for 5 rows (that overlap with slow running UPSERT SELECT)
- for (int i = 0; i < 100; i += 25) {
- futures.add(completionService.submit(new UpsertSelectRunner(dataTable, i, i+25, 5)));
- }
- int received = 0;
- while (received < futures.size()) {
- Future<Boolean> resultFuture = completionService.take();
- Boolean result = resultFuture.get();
- received++;
- assertTrue(result);
+ try (Connection conn = driver.connect(url, props)) {
+ int numUpsertSelectRunners = 5;
+ ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners);
+ CompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(exec);
+ List<Future<Boolean>> futures = Lists.newArrayListWithExpectedSize(numUpsertSelectRunners);
+ // run one UPSERT SELECT for 100 rows (that locks the rows for a long time)
+ futures.add(completionService.submit(new UpsertSelectRunner(dataTable, 0, 105, 1)));
+ // run four UPSERT SELECTS for 5 rows (that overlap with slow running UPSERT SELECT)
+ for (int i = 0; i < 100; i += 25) {
+ futures.add(completionService.submit(new UpsertSelectRunner(dataTable, i, i+25, 5)));
+ }
+ int received = 0;
+ while (received < futures.size()) {
+ Future<Boolean> resultFuture = completionService.take();
+ Boolean result = resultFuture.get();
+ received++;
+ assertTrue(result);
+ }
+ exec.shutdownNow();
}
- exec.shutdownNow();
- conn.close();
}
+
+ /**
+ * Tests that splitting a region is not blocked indefinitely by UPSERT SELECT load
+ */
+ @Test
+ public void testSplitDuringUpsertSelect() throws Exception {
+ int numUpsertSelectRunners = 4;
+ ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners);
+ try (Connection conn = driver.connect(url, props)) {
+ final UpsertSelectRunner upsertSelectRunner =
+ new UpsertSelectRunner(dataTable, 0, 105, 1);
+ // keep running slow upsert selects
+ SlowBatchRegionObserver.SLOW_MUTATE = true;
+ for (int i = 0; i < numUpsertSelectRunners; i++) {
+ exec.submit(new UpsertSelectLooper(upsertSelectRunner));
+ Thread.sleep(300);
+ }
+
+ // keep trying to split the region
+ final HBaseTestingUtility utility = getUtility();
+ final HBaseAdmin admin = utility.getHBaseAdmin();
+ final TableName dataTN = TableName.valueOf(dataTable);
+ assertEquals(1, utility.getHBaseCluster().getRegions(dataTN).size());
+ utility.waitFor(30000L, 1000, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ try {
+ List<HRegionInfo> regions = admin.getTableRegions(dataTN);
+ if (regions.size() > 1) {
+ logger.info("Found region was split");
+ return true;
+ }
+ if (regions.size() == 0) {
+ // This happens when region in transition or closed
+ logger.info("No region returned");
+ return false;
+ }
+ ;
+ HRegionInfo hRegion = regions.get(0);
+ logger.info("Attempting to split region");
+ admin.splitRegion(hRegion.getRegionName(), Bytes.toBytes(2));
+ return false;
+ } catch (NotServingRegionException nsre) {
+ // during split
+ return false;
+ }
+ }
+ });
+ } finally {
+ SlowBatchRegionObserver.SLOW_MUTATE = false;
+ exec.shutdownNow();
+ exec.awaitTermination(60, TimeUnit.SECONDS);
+ }
+ }
+
+ /**
+ * Tests that UPSERT SELECT doesn't indefinitely block region closes
+ */
+ @Test
+ public void testRegionCloseDuringUpsertSelect() throws Exception {
+ int numUpsertSelectRunners = 4;
+ ExecutorService exec = Executors.newFixedThreadPool(numUpsertSelectRunners);
+ try (Connection conn = driver.connect(url, props)) {
+ final UpsertSelectRunner upsertSelectRunner =
+ new UpsertSelectRunner(dataTable, 0, 105, 1);
+ // keep running slow upsert selects
+ SlowBatchRegionObserver.SLOW_MUTATE = true;
+ for (int i = 0; i < numUpsertSelectRunners; i++) {
+ exec.submit(new UpsertSelectLooper(upsertSelectRunner));
+ Thread.sleep(300);
+ }
+
+ final HBaseTestingUtility utility = getUtility();
+ // try to close the region while UPSERT SELECTs are happening,
+ final HRegionServer dataRs = utility.getHBaseCluster().getRegionServer(0);
+ final HBaseAdmin admin = utility.getHBaseAdmin();
+ final HRegionInfo dataRegion =
+ admin.getTableRegions(TableName.valueOf(dataTable)).get(0);
+ logger.info("Closing data table region");
+ admin.closeRegion(dataRs.getServerName(), dataRegion);
+ // make sure the region is offline
+ utility.waitFor(30000L, 1000, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ List<HRegionInfo> onlineRegions =
+ admin.getOnlineRegions(dataRs.getServerName());
+ for (HRegionInfo onlineRegion : onlineRegions) {
+ if (onlineRegion.equals(dataRegion)) {
+ logger.info("Data region still online");
+ return false;
+ }
+ }
+ logger.info("Region is no longer online");
+ return true;
+ }
+ });
+ } finally {
+ SlowBatchRegionObserver.SLOW_MUTATE = false;
+ exec.shutdownNow();
+ exec.awaitTermination(60, TimeUnit.SECONDS);
+ }
+ }
public static class SlowBatchRegionObserver extends SimpleRegionObserver {
+ public static volatile boolean SLOW_MUTATE = false;
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
// model a slow batch that takes a long time
- if (miniBatchOp.size()==100) {
+ if ((miniBatchOp.size()==100 || SLOW_MUTATE) && c.getEnvironment().getRegionInfo().getTable().getNameAsString().equals(dataTable)) {
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6f65a793/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 0773ebc..a2a1b5c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -164,7 +164,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
/**
* This lock used for synchronizing the state of
* {@link UngroupedAggregateRegionObserver#scansReferenceCount},
- * {@link UngroupedAggregateRegionObserver#isRegionClosing} variables used to avoid possible
+ * {@link UngroupedAggregateRegionObserver#isRegionClosingOrSplitting} variables used to avoid possible
* dead lock situation in case below steps:
* 1. We get read lock when we start writing local indexes, deletes etc..
* 2. when memstore reach threshold, flushes happen. Since they use read (shared) lock they
@@ -191,7 +191,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
@GuardedBy("lock")
private int scansReferenceCount = 0;
@GuardedBy("lock")
- private boolean isRegionClosing = false;
+ private boolean isRegionClosingOrSplitting = false;
private static final Logger logger = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class);
private KeyValueBuilder kvBuilder;
private Configuration upsertSelectConfig;
@@ -285,7 +285,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
*/
private void checkForRegionClosing() throws IOException {
synchronized (lock) {
- if(isRegionClosing) {
+ if(isRegionClosingOrSplitting) {
lock.notifyAll();
throw new IOException("Region is getting closed. Not allowing to write to avoid possible deadlock.");
}
@@ -499,13 +499,16 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
useIndexProto = false;
}
boolean acquiredLock = false;
- try {
- if(needToWrite) {
- synchronized (lock) {
- scansReferenceCount++;
- lock.notifyAll();
+ if(needToWrite) {
+ synchronized (lock) {
+ if (isRegionClosingOrSplitting) {
+ throw new IOException("Temporarily unable to write from scan because region is closing or splitting");
}
+ scansReferenceCount++;
+ lock.notifyAll();
}
+ }
+ try {
region.startRegionOperation();
acquiredLock = true;
synchronized (innerScanner) {
@@ -1295,6 +1298,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
// Don't allow splitting if operations need read and write to same region are going on in the
// the coprocessors to avoid dead lock scenario. See PHOENIX-3111.
synchronized (lock) {
+ isRegionClosingOrSplitting = true;
if (scansReferenceCount > 0) {
throw new IOException("Operations like local index building/delete/upsert select"
+ " might be going on so not allowing to split.");
@@ -1319,12 +1323,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested)
throws IOException {
synchronized (lock) {
- isRegionClosing = true;
+ isRegionClosingOrSplitting = true;
while (scansReferenceCount > 0) {
try {
lock.wait(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while waiting for completion of operations like local index building/delete/upsert select");
}
}
}