You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by pb...@apache.org on 2018/04/22 10:10:26 UTC
[1/2] phoenix git commit: PHOENIX-4613 Thread clientVersion through
to IndexCommitter implementors
Repository: phoenix
Updated Branches:
refs/heads/4.x-cdh5.11 9948036b6 -> 8bba23976
PHOENIX-4613 Thread clientVersion through to IndexCommitter implementors
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1bfe303f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1bfe303f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1bfe303f
Branch: refs/heads/4.x-cdh5.11
Commit: 1bfe303f6a861feea0d23e64bb1ec93d5db69a1f
Parents: 9948036
Author: Vincent Poon <vi...@apache.org>
Authored: Fri Apr 20 21:35:06 2018 +0100
Committer: Pedro Boado <pb...@apache.org>
Committed: Sun Apr 22 11:07:09 2018 +0100
----------------------------------------------------------------------
.../org/apache/phoenix/hbase/index/Indexer.java | 16 ++++++++++-----
.../hbase/index/builder/IndexBuildManager.java | 4 ++++
.../hbase/index/covered/IndexMetaData.java | 12 ++++++++++-
.../hbase/index/write/IndexCommitter.java | 2 +-
.../phoenix/hbase/index/write/IndexWriter.java | 21 ++++++++++----------
.../write/ParallelWriterIndexCommitter.java | 2 +-
.../hbase/index/write/RecoveryIndexWriter.java | 4 ++--
.../TrackingParallelWriterIndexCommitter.java | 2 +-
.../index/PhoenixTransactionalIndexer.java | 9 +++++++--
.../index/covered/LocalTableStateTest.java | 18 ++++++++++++++++-
.../hbase/index/write/TestIndexWriter.java | 7 +++----
.../index/write/TestParalleIndexWriter.java | 4 +++-
.../write/TestParalleWriterIndexCommitter.java | 3 ++-
13 files changed, 74 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1bfe303f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index c7dbff2..1ef09fe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -91,6 +91,7 @@ import org.apache.phoenix.trace.TracingUtils;
import org.apache.phoenix.trace.util.NullSpan;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;
import com.google.common.collect.Lists;
@@ -132,8 +133,13 @@ public class Indexer extends BaseRegionObserver {
// Hack to get around not being able to save any state between
// coprocessor calls. TODO: remove after HBASE-18127 when available
private static class BatchMutateContext {
+ public final int clientVersion;
public Collection<Pair<Mutation, byte[]>> indexUpdates = Collections.emptyList();
public List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+
+ public BatchMutateContext(int clientVersion) {
+ this.clientVersion = clientVersion;
+ }
}
private ThreadLocal<BatchMutateContext> batchMutateContext =
@@ -399,7 +405,7 @@ public class Indexer extends BaseRegionObserver {
* Exclusively lock all rows so we get a consistent read
* while determining the index updates
*/
- BatchMutateContext context = new BatchMutateContext();
+ BatchMutateContext context = new BatchMutateContext(this.builder.getIndexMetaData(miniBatchOp).getClientVersion());
setBatchMutateContext(c, context);
Durability durability = Durability.SKIP_WAL;
boolean copyMutations = false;
@@ -627,10 +633,10 @@ public class Indexer extends BaseRegionObserver {
}
}
if(!remoteUpdates.isEmpty()) {
- writer.writeAndKillYourselfOnFailure(remoteUpdates, false);
+ writer.writeAndKillYourselfOnFailure(remoteUpdates, false, context.clientVersion);
}
if(!localUpdates.isEmpty()) {
- writer.writeAndKillYourselfOnFailure(localUpdates, true);
+ writer.writeAndKillYourselfOnFailure(localUpdates, true, context.clientVersion);
}
long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
@@ -700,7 +706,7 @@ public class Indexer extends BaseRegionObserver {
// do the usual writer stuff, killing the server again, if we can't manage to make the index
// writes succeed again
try {
- writer.writeAndKillYourselfOnFailure(updates, true);
+ writer.writeAndKillYourselfOnFailure(updates, true, ScanUtil.UNKNOWN_CLIENT_VERSION);
} catch (IOException e) {
LOG.error("During WAL replay of outstanding index updates, "
+ "Exception is thrown instead of killing server during index writing", e);
@@ -738,7 +744,7 @@ public class Indexer extends BaseRegionObserver {
* hopes they come up before the primary table finishes.
*/
Collection<Pair<Mutation, byte[]>> indexUpdates = extractIndexUpdate(logEdit);
- recoveryWriter.writeAndKillYourselfOnFailure(indexUpdates, true);
+ recoveryWriter.writeAndKillYourselfOnFailure(indexUpdates, true, ScanUtil.UNKNOWN_CLIENT_VERSION);
} finally {
long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
if (duration >= slowPreWALRestoreThreshold) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1bfe303f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index 4c410ad..2550dd1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -73,6 +73,10 @@ public class IndexBuildManager implements Stoppable {
}
}
+ public IndexMetaData getIndexMetaData(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+ return this.delegate.getIndexMetaData(miniBatchOp);
+ }
+
public Collection<Pair<Mutation, byte[]>> getIndexUpdate(
MiniBatchOperationInProgress<Mutation> miniBatchOp,
Collection<? extends Mutation> mutations) throws Throwable {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1bfe303f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
index 20ed855..18b515c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexMetaData.java
@@ -19,8 +19,10 @@ package org.apache.phoenix.hbase.index.covered;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
+import org.apache.phoenix.util.ScanUtil;
public interface IndexMetaData {
+
public static final IndexMetaData NULL_INDEX_META_DATA = new IndexMetaData() {
@Override
@@ -31,7 +33,13 @@ public interface IndexMetaData {
@Override
public ReplayWrite getReplayWrite() {
return null;
- }};
+ }
+
+ @Override
+ public int getClientVersion() {
+ return ScanUtil.UNKNOWN_CLIENT_VERSION;
+ }
+ };
/**
@@ -42,4 +50,6 @@ public interface IndexMetaData {
public boolean requiresPriorRowState(Mutation m);
public ReplayWrite getReplayWrite();
+
+ public int getClientVersion();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1bfe303f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java
index 5e3f3ed..e9dc202 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java
@@ -32,6 +32,6 @@ public interface IndexCommitter extends Stoppable {
void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name);
- public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, boolean allowLocalUpdates)
+ public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, boolean allowLocalUpdates, int clientVersion)
throws IndexWriteException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1bfe303f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
index 4e5e182..c28288c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
@@ -135,13 +135,14 @@ public class IndexWriter implements Stoppable {
* which ensures that the server crashes when an index write fails, ensuring that we get WAL
* replay of the index edits.
* @param indexUpdates Updates to write
+ * @param clientVersion version of the client
* @throws IOException
*/
public void writeAndKillYourselfOnFailure(Collection<Pair<Mutation, byte[]>> indexUpdates,
- boolean allowLocalUpdates) throws IOException {
+ boolean allowLocalUpdates, int clientVersion) throws IOException {
// convert the strings to htableinterfaces to which we can talk and group by TABLE
Multimap<HTableInterfaceReference, Mutation> toWrite = resolveTableReferences(indexUpdates);
- writeAndKillYourselfOnFailure(toWrite, allowLocalUpdates);
+ writeAndKillYourselfOnFailure(toWrite, allowLocalUpdates, clientVersion);
}
/**
@@ -150,9 +151,9 @@ public class IndexWriter implements Stoppable {
* @throws IOException
*/
public void writeAndKillYourselfOnFailure(Multimap<HTableInterfaceReference, Mutation> toWrite,
- boolean allowLocalUpdates) throws IOException {
+ boolean allowLocalUpdates, int clientVersion) throws IOException {
try {
- write(toWrite, allowLocalUpdates);
+ write(toWrite, allowLocalUpdates, clientVersion);
if (LOG.isTraceEnabled()) {
LOG.trace("Done writing all index updates!\n\t" + toWrite);
}
@@ -176,12 +177,12 @@ public class IndexWriter implements Stoppable {
* @throws IndexWriteException if we cannot successfully write to the index. Whether or not we
* stop early depends on the {@link IndexCommitter}.
*/
- public void write(Collection<Pair<Mutation, byte[]>> toWrite) throws IndexWriteException {
- write(resolveTableReferences(toWrite), false);
+ public void write(Collection<Pair<Mutation, byte[]>> toWrite, int clientVersion) throws IndexWriteException {
+ write(resolveTableReferences(toWrite), false, clientVersion);
}
- public void write(Collection<Pair<Mutation, byte[]>> toWrite, boolean allowLocalUpdates) throws IOException {
- write(resolveTableReferences(toWrite), allowLocalUpdates);
+ public void write(Collection<Pair<Mutation, byte[]>> toWrite, boolean allowLocalUpdates, int clientVersion) throws IOException {
+ write(resolveTableReferences(toWrite), allowLocalUpdates, clientVersion);
}
/**
@@ -189,9 +190,9 @@ public class IndexWriter implements Stoppable {
* @param toWrite
* @throws IndexWriteException
*/
- public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, boolean allowLocalUpdates)
+ public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, boolean allowLocalUpdates, int clientVersion)
throws IndexWriteException {
- this.writer.write(toWrite, allowLocalUpdates);
+ this.writer.write(toWrite, allowLocalUpdates, clientVersion);
}
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1bfe303f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
index 0bb8784..aba2678 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
@@ -94,7 +94,7 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
}
@Override
- public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, final boolean allowLocalUpdates) throws SingleIndexWriteFailureException {
+ public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, final boolean allowLocalUpdates, final int clientVersion) throws SingleIndexWriteFailureException {
/*
* This bit here is a little odd, so let's explain what's going on. Basically, we want to do the writes in
* parallel to each index table, so each table gets its own task and is submitted to the pool. Where it gets
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1bfe303f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
index e340784..35f0a6d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/RecoveryIndexWriter.java
@@ -69,9 +69,9 @@ public class RecoveryIndexWriter extends IndexWriter {
}
@Override
- public void write(Collection<Pair<Mutation, byte[]>> toWrite, boolean allowLocalUpdates) throws IOException {
+ public void write(Collection<Pair<Mutation, byte[]>> toWrite, boolean allowLocalUpdates, int clientVersion) throws IOException {
try {
- write(resolveTableReferences(toWrite), allowLocalUpdates);
+ write(resolveTableReferences(toWrite), allowLocalUpdates, clientVersion);
} catch (MultiIndexWriteFailureException e) {
for (HTableInterfaceReference table : e.getFailedTables()) {
if (!admin.tableExists(table.getTableName())) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1bfe303f/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
index 94d4f0f..4dbad63 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
@@ -115,7 +115,7 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
}
@Override
- public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, final boolean allowLocalUpdates) throws MultiIndexWriteFailureException {
+ public void write(Multimap<HTableInterfaceReference, Mutation> toWrite, final boolean allowLocalUpdates, final int clientVersion) throws MultiIndexWriteFailureException {
Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
TaskBatch<Boolean> tasks = new TaskBatch<Boolean>(entries.size());
List<HTableInterfaceReference> tables = new ArrayList<HTableInterfaceReference>(entries.size());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1bfe303f/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 405fc0c..610ea44 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -70,6 +70,11 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
// coprocessor calls. TODO: remove after HBASE-18127 when available
private static class BatchMutateContext {
public Collection<Pair<Mutation, byte[]>> indexUpdates = Collections.emptyList();
+ public final int clientVersion;
+
+ public BatchMutateContext(int clientVersion) {
+ this.clientVersion = clientVersion;
+ }
}
private ThreadLocal<BatchMutateContext> batchMutateContext =
@@ -155,7 +160,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
super.preBatchMutate(c, miniBatchOp);
return;
}
- BatchMutateContext context = new BatchMutateContext();
+ BatchMutateContext context = new BatchMutateContext(indexMetaData.getClientVersion());
setBatchMutateContext(c, context);
Collection<Pair<Mutation, byte[]>> indexUpdates = null;
@@ -204,7 +209,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
if (success) { // if miniBatchOp was successfully written, write index updates
if (!context.indexUpdates.isEmpty()) {
- this.writer.write(context.indexUpdates, true);
+ this.writer.write(context.indexUpdates, true, context.clientVersion);
}
current.addTimelineAnnotation("Wrote index updates");
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1bfe303f/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
index c7e1769..56ba1d6 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/LocalTableStateTest.java
@@ -40,6 +40,7 @@ import org.apache.phoenix.hbase.index.covered.data.LocalTable;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.scanner.Scanner;
import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner;
+import org.apache.phoenix.util.ScanUtil;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
@@ -67,6 +68,11 @@ public class LocalTableStateTest {
return true;
}
+ @Override
+ public int getClientVersion() {
+ return ScanUtil.UNKNOWN_CLIENT_VERSION;
+ }
+
};
@SuppressWarnings("unchecked")
@@ -130,7 +136,12 @@ public class LocalTableStateTest {
return true;
}
- };
+ @Override
+ public int getClientVersion() {
+ return ScanUtil.UNKNOWN_CLIENT_VERSION;
+ }
+
+ };
Put m = new Put(row);
m.addColumn(fam, qual, ts, val);
// setup mocks
@@ -167,6 +178,11 @@ public class LocalTableStateTest {
return false;
}
+ @Override
+ public int getClientVersion() {
+ return ScanUtil.UNKNOWN_CLIENT_VERSION;
+ }
+
};
Put m = new Put(row);
m.addColumn(fam, qual, ts, val);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1bfe303f/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
index a25f7cf..58050c1 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestIndexWriter.java
@@ -17,10 +17,8 @@
*/
package org.apache.phoenix.hbase.index.write;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.Arrays;
@@ -51,6 +49,7 @@ import org.apache.phoenix.hbase.index.StubAbortable;
import org.apache.phoenix.hbase.index.TableName;
import org.apache.phoenix.hbase.index.exception.IndexWriteException;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.util.ScanUtil;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;
@@ -134,7 +133,7 @@ public class TestIndexWriter {
KillServerOnFailurePolicy policy = new KillServerOnFailurePolicy();
policy.setup(stop, abort);
IndexWriter writer = new IndexWriter(committer, policy);
- writer.write(indexUpdates);
+ writer.write(indexUpdates, ScanUtil.UNKNOWN_CLIENT_VERSION);
assertTrue("Writer returned before the table batch completed! Likely a race condition tripped",
completed[0]);
writer.stop(this.testName.getTableNameString() + " finished");
@@ -208,7 +207,7 @@ public class TestIndexWriter {
@Override
public void run() {
try {
- writer.write(indexUpdates);
+ writer.write(indexUpdates, ScanUtil.UNKNOWN_CLIENT_VERSION);
} catch (IndexWriteException e) {
failedWrite[0] = true;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1bfe303f/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java
index cd29e10..55c3fb3 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleIndexWriter.java
@@ -41,8 +41,10 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.phoenix.hbase.index.StubAbortable;
import org.apache.phoenix.hbase.index.TableName;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.util.ScanUtil;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;
@@ -124,7 +126,7 @@ public class TestParalleIndexWriter {
// setup the writer and failure policy
TrackingParallelWriterIndexCommitter writer = new TrackingParallelWriterIndexCommitter(VersionInfo.getVersion());
writer.setup(factory, exec, abort, stop, e);
- writer.write(indexUpdates, true);
+ writer.write(indexUpdates, true, ScanUtil.UNKNOWN_CLIENT_VERSION);
assertTrue("Writer returned before the table batch completed! Likely a race condition tripped",
completed[0]);
writer.stop(this.test.getTableNameString() + " finished");
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1bfe303f/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java
index 32ae108..9767eae 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestParalleWriterIndexCommitter.java
@@ -43,6 +43,7 @@ import org.apache.phoenix.hbase.index.StubAbortable;
import org.apache.phoenix.hbase.index.TableName;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.util.ScanUtil;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;
@@ -125,7 +126,7 @@ public class TestParalleWriterIndexCommitter {
// setup the writer and failure policy
TrackingParallelWriterIndexCommitter writer = new TrackingParallelWriterIndexCommitter(VersionInfo.getVersion());
writer.setup(factory, exec, abort, stop, e);
- writer.write(indexUpdates, true);
+ writer.write(indexUpdates, true, ScanUtil.UNKNOWN_CLIENT_VERSION);
assertTrue("Writer returned before the table batch completed! Likely a race condition tripped",
completed[0]);
writer.stop(this.test.getTableNameString() + " finished");
[2/2] phoenix git commit: PHOENIX-4601 Perform server-side retries if
client version < 4.14
Posted by pb...@apache.org.
PHOENIX-4601 Perform server-side retries if client version < 4.14
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8bba2397
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8bba2397
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8bba2397
Branch: refs/heads/4.x-cdh5.11
Commit: 8bba23976be4b5d4f0139660831532ac4ca23f9e
Parents: 1bfe303
Author: Vincent Poon <vi...@apache.org>
Authored: Fri Apr 20 22:22:10 2018 +0100
Committer: Pedro Boado <pb...@apache.org>
Committed: Sun Apr 22 11:07:22 2018 +0100
----------------------------------------------------------------------
.../hbase/index/write/IndexWriterUtils.java | 36 ++++++++++++++++----
.../write/ParallelWriterIndexCommitter.java | 12 +++++--
.../TrackingParallelWriterIndexCommitter.java | 13 ++++---
.../phoenix/jdbc/PhoenixDatabaseMetaData.java | 1 +
4 files changed, 48 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8bba2397/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
index 76d6800..0d3004f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
@@ -21,6 +21,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.phoenix.hbase.index.table.HTableFactory;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.util.PropertiesUtil;
@@ -58,15 +59,19 @@ public class IndexWriterUtils {
public static final String INDEX_WRITES_THREAD_MAX_PER_REGIONSERVER_KEY = "phoenix.index.writes.threads.max";
public static final String HTABLE_KEEP_ALIVE_KEY = "hbase.htable.threads.keepalivetime";
+ @Deprecated
public static final String INDEX_WRITER_RPC_RETRIES_NUMBER = "phoenix.index.writes.rpc.retries.number";
- /**
- * Retry server-server index write rpc only once, and let the client retry the data write
- * instead to avoid typing up the handler
- */
- // note in HBase 2+, numTries = numRetries + 1
- // in prior versions, numTries = numRetries
- public static final int DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER = 1;
+ /**
+ * Based on the logic in HBase's AsyncProcess, a default of 11 retries with a pause of 100ms
+ * approximates 48 sec total retry time (factoring in backoffs). The total time should be less
+ * than HBase's rpc timeout (default of 60 sec) or else the client will retry before receiving
+ * the response
+ */
+ @Deprecated
+ public static final int DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER = 11;
+ @Deprecated
public static final String INDEX_WRITER_RPC_PAUSE = "phoenix.index.writes.rpc.pause";
+ @Deprecated
public static final int DEFAULT_INDEX_WRITER_RPC_PAUSE = 100;
private IndexWriterUtils() {
@@ -76,12 +81,29 @@ public class IndexWriterUtils {
public static HTableFactory getDefaultDelegateHTableFactory(CoprocessorEnvironment env) {
// create a simple delegate factory, setup the way we need
Configuration conf = PropertiesUtil.cloneConfig(env.getConfiguration());
+ setHTableThreads(conf);
+ return ServerUtil.getDelegateHTableFactory(env, conf);
+ }
+
+ private static void setHTableThreads(Configuration conf) {
// set the number of threads allowed per table.
int htableThreads =
conf.getInt(IndexWriterUtils.INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY,
IndexWriterUtils.DEFAULT_NUM_PER_TABLE_THREADS);
LOG.trace("Creating HTableFactory with " + htableThreads + " threads for each HTable.");
IndexManagementUtil.setIfNotSet(conf, HTABLE_THREAD_KEY, htableThreads);
+ }
+
+ /**
+ * Retry server-server index write rpc only once, and let the client retry the data write
+ * instead to avoid tying up the handler
+ */
+ public static HTableFactory getNoRetriesHTableFactory(CoprocessorEnvironment env) {
+ Configuration conf = PropertiesUtil.cloneConfig(env.getConfiguration());
+ setHTableThreads(conf);
+ // note in HBase 2+, numTries = numRetries + 1
+ // in prior versions, numTries = numRetries
+ conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
return ServerUtil.getDelegateHTableFactory(env, conf);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8bba2397/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
index aba2678..e06efcc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
@@ -36,6 +36,7 @@ import org.apache.phoenix.hbase.index.table.HTableFactory;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.util.IndexUtil;
import com.google.common.collect.Multimap;
@@ -57,7 +58,8 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
public static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY = "index.writer.threads.keepalivetime";
private static final Log LOG = LogFactory.getLog(ParallelWriterIndexCommitter.class);
- private HTableFactory factory;
+ private HTableFactory retryingFactory;
+ private HTableFactory noRetriesfactory;
private Stoppable stopped;
private QuickFailingTaskRunner pool;
private KeyValueBuilder kvBuilder;
@@ -88,7 +90,8 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
* Exposed for TESTING
*/
void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop, RegionCoprocessorEnvironment env) {
- this.factory = factory;
+ this.retryingFactory = factory;
+ this.noRetriesfactory = IndexWriterUtils.getNoRetriesHTableFactory(env);
this.pool = new QuickFailingTaskRunner(pool);
this.stopped = stop;
}
@@ -162,6 +165,8 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
}
}
}
+ // if the client can retry index writes, then we don't need to retry here
+ HTableFactory factory = clientVersion < PhoenixDatabaseMetaData.MIN_CLIENT_RETRY_INDEX_WRITES ? retryingFactory : noRetriesfactory;
table = factory.getTable(tableReference.get());
throwFailureIfDone();
table.batch(mutations);
@@ -226,7 +231,8 @@ public class ParallelWriterIndexCommitter implements IndexCommitter {
public void stop(String why) {
LOG.info("Shutting down " + this.getClass().getSimpleName() + " because " + why);
this.pool.stop(why);
- this.factory.shutdown();
+ this.retryingFactory.shutdown();
+ this.noRetriesfactory.shutdown();
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8bba2397/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
index 4dbad63..4ba1155 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/TrackingParallelWriterIndexCommitter.java
@@ -41,6 +41,7 @@ import org.apache.phoenix.hbase.index.table.HTableFactory;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.util.IndexUtil;
import com.google.common.collect.Multimap;
@@ -74,7 +75,8 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY = "index.writer.threads.keepalivetime";
private TaskRunner pool;
- private HTableFactory factory;
+ private HTableFactory retryingFactory;
+ private HTableFactory noRetriesFactory;
private CapturingAbortable abortable;
private Stoppable stopped;
private RegionCoprocessorEnvironment env;
@@ -108,7 +110,8 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop,
RegionCoprocessorEnvironment env) {
this.pool = new WaitForCompletionTaskRunner(pool);
- this.factory = factory;
+ this.retryingFactory = factory;
+ this.noRetriesFactory = IndexWriterUtils.getNoRetriesHTableFactory(env);
this.abortable = new CapturingAbortable(abortable);
this.stopped = stop;
this.env = env;
@@ -175,7 +178,8 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
if (LOG.isTraceEnabled()) {
LOG.trace("Writing index update:" + mutations + " to table: " + tableReference);
}
-
+ // if the client can retry index writes, then we don't need to retry here
+ HTableFactory factory = clientVersion < PhoenixDatabaseMetaData.MIN_CLIENT_RETRY_INDEX_WRITES ? retryingFactory : noRetriesFactory;
table = factory.getTable(tableReference.get());
throwFailureIfDone();
table.batch(mutations);
@@ -238,7 +242,8 @@ public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
public void stop(String why) {
LOG.info("Shutting down " + this.getClass().getSimpleName());
this.pool.stop(why);
- this.factory.shutdown();
+ this.retryingFactory.shutdown();
+ this.noRetriesFactory.shutdown();
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8bba2397/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index add0628..d56628a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -327,6 +327,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
public static final int MIN_NAMESPACE_MAPPED_PHOENIX_VERSION = VersionUtil.encodeVersion("4", "8", "0");
public static final int MIN_PENDING_ACTIVE_INDEX = VersionUtil.encodeVersion("4", "12", "0");
public static final int MIN_PENDING_DISABLE_INDEX = VersionUtil.encodeVersion("4", "14", "0");
+ public static final int MIN_CLIENT_RETRY_INDEX_WRITES = VersionUtil.encodeVersion("4", "14", "0");
public static final int MIN_TX_CLIENT_SIDE_MAINTENANCE = VersionUtil.encodeVersion("4", "14", "0");
// Version below which we should turn off essential column family.