You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by vi...@apache.org on 2019/02/04 22:45:31 UTC
[phoenix] branch master updated: PHOENIX-5080 Index becomes Active
during Partial Index Rebuilder if Index Failure happens
This is an automated email from the ASF dual-hosted git repository.
vincentpoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 2ace905 PHOENIX-5080 Index becomes Active during Partial Index Rebuilder if Index Failure happens
2ace905 is described below
commit 2ace9051c3d01dc599ba6b1a362ec7f84500efa5
Author: Monani Mihir <mo...@gmail.com>
AuthorDate: Wed Jan 30 00:43:32 2019 +0530
PHOENIX-5080 Index becomes Active during Partial Index Rebuilder if Index Failure happens
---
.../end2end/index/PartialIndexRebuilderIT.java | 63 ++++++++++++++++++++++
.../coprocessor/BaseScannerRegionObserver.java | 9 +++-
.../UngroupedAggregateRegionObserver.java | 25 +++++++--
.../org/apache/phoenix/execute/MutationState.java | 14 ++++-
.../org/apache/phoenix/hbase/index/Indexer.java | 11 +---
.../hbase/index/builder/IndexBuildManager.java | 8 +++
.../phoenix/index/PhoenixIndexFailurePolicy.java | 32 +++++++++--
.../apache/phoenix/index/PhoenixIndexMetaData.java | 3 +-
.../java/org/apache/phoenix/query/BaseTest.java | 25 +++++++++
9 files changed, 169 insertions(+), 21 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
index b9cbe30..5bd41fc 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
@@ -86,6 +86,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
private static final long REBUILD_PERIOD = 50000;
private static final long REBUILD_INTERVAL = 2000;
private static RegionCoprocessorEnvironment indexRebuildTaskRegionEnvironment;
+ private static Boolean runRebuildOnce = true;
@BeforeClass
@@ -125,6 +126,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
runIndexRebuilderAsync(interval, cancel, Collections.<String>singletonList(table));
}
private static void runIndexRebuilderAsync(final int interval, final boolean[] cancel, final List<String> tables) {
+ runRebuildOnce = true;
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
@@ -137,6 +139,8 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
throw new RuntimeException(e);
} catch (SQLException e) {
LOG.error(e.getMessage(),e);
+ } finally {
+ runRebuildOnce = false;
}
}
}
@@ -1040,6 +1044,65 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
}
}
+ @Test
+ @Repeat(5)
+ public void testIndexActiveIfRegionMovesWhileRebuilding() throws Throwable {
+ final MyClock clock = new MyClock(1000);
+ EnvironmentEdgeManager.injectEdge(clock);
+ String schemaName = generateUniqueName();
+ String tableName = generateUniqueName();
+ String indexName = generateUniqueName();
+ int nThreads = 5;
+ int nRows = 50;
+ int nIndexValues = 23;
+ int batchSize = 200;
+ final CountDownLatch doneSignal = new CountDownLatch(nThreads);
+ boolean[] cancel = new boolean[1];
+
+ final String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+ final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ try {
+ conn.createStatement().execute("CREATE TABLE " + fullTableName
+ + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, "
+ + "CONSTRAINT pk PRIMARY KEY (k1,k2)) STORE_NULLS=true, VERSIONS=1");
+ conn.createStatement().execute("CREATE INDEX " + indexName + " ON "
+ + fullTableName + "(v1)");
+ conn.commit();
+ long disableTS = clock.currentTime();
+ Table metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices()
+ .getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+ IndexUtil.updateIndexState(fullIndexName, disableTS,
+ metaTable, PIndexState.DISABLE);
+ assertTrue(TestUtil.checkIndexState(conn, fullIndexName,
+ PIndexState.DISABLE, disableTS));
+ mutateRandomly(fullTableName, nThreads, nRows,
+ nIndexValues, batchSize, doneSignal);
+ assertTrue("Ran out of time", doneSignal.await(120, TimeUnit.SECONDS));
+ runIndexRebuilder(fullTableName);
+ assertTrue(TestUtil.checkIndexState(conn, fullIndexName,
+ PIndexState.INACTIVE, disableTS));
+ clock.time += WAIT_AFTER_DISABLED;
+ runIndexRebuilderAsync(500,cancel,fullTableName);
+ unassignRegionAsync(fullIndexName);
+ while (runRebuildOnce) {
+ PIndexState indexState = TestUtil.getIndexState(conn, fullIndexName);
+ if (indexState != PIndexState.INACTIVE && indexState != PIndexState.ACTIVE) {
+ cancel[0] = true;
+ throw new Exception("Index State should not transtion from INACTIVE to "
+ + indexState);
+ }
+ }
+ assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
+ } finally {
+ cancel[0] = true;
+ EnvironmentEdgeManager.injectEdge(null);
+ }
+ long totalRows = IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
+ assertEquals(nRows, totalRows);
+ }
+ }
+
public static class WriteFailingRegionObserver extends SimpleRegionObserver {
@Override
public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 725e792..dc95235 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -103,10 +103,14 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
public final static byte[] REPLAY_TABLE_AND_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(1);
public final static byte[] REPLAY_ONLY_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(2);
+ // In case of Index Write failure, we need to determine that Index mutation
+ // is part of normal client write or Index Rebuilder. # PHOENIX-5080
+ public final static byte[] REPLAY_INDEX_REBUILD_WRITES = PUnsignedTinyint.INSTANCE.toBytes(3);
public enum ReplayWrite {
TABLE_AND_INDEX,
- INDEX_ONLY;
+ INDEX_ONLY,
+ REBUILD_INDEX_ONLY;
public static ReplayWrite fromBytes(byte[] replayWriteBytes) {
if (replayWriteBytes == null) {
@@ -118,6 +122,9 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
if (Bytes.compareTo(REPLAY_ONLY_INDEX_WRITES, replayWriteBytes) == 0) {
return INDEX_ONLY;
}
+ if (Bytes.compareTo(REPLAY_INDEX_REBUILD_WRITES, replayWriteBytes) == 0) {
+ return REBUILD_INDEX_ONLY;
+ }
throw new IllegalArgumentException("Unknown ReplayWrite code of " + Bytes.toStringBinary(replayWriteBytes));
}
};
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 36717fb..3be4d36 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -105,6 +105,7 @@ import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
import org.apache.phoenix.index.PhoenixIndexFailurePolicy.MutateCommand;
+import org.apache.phoenix.index.PhoenixIndexMetaData;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.join.HashJoinInfo;
@@ -262,6 +263,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
public void doMutation() throws IOException {
commitBatch(region, localRegionMutations, blockingMemstoreSize);
}
+
+ @Override
+ public List<Mutation> getMutationList() {
+ return localRegionMutations;
+ }
});
}
}
@@ -944,6 +950,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
public void doMutation() throws IOException {
commitBatchWithHTable(targetHTable, remoteRegionMutations);
}
+
+ @Override
+ public List<Mutation> getMutationList() {
+ return remoteRegionMutations;
+ }
});
}
localRegionMutations.clear();
@@ -958,7 +969,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
// For an index write failure, the data table write succeeded,
// so when we retry we need to set REPLAY_WRITES
for (Mutation mutation : localRegionMutations) {
- mutation.setAttribute(REPLAY_WRITES, REPLAY_ONLY_INDEX_WRITES);
+ if (PhoenixIndexMetaData.isIndexRebuild(mutation.getAttributesMap())) {
+ mutation.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
+ BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
+ } else {
+ mutation.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
+ BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES);
+ }
// use the server timestamp for index write retries
PhoenixKeyValueUtil.setTimestamp(mutation, serverTimestamp);
}
@@ -1089,7 +1106,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
put = new Put(CellUtil.cloneRow(cell));
put.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
- put.setAttribute(REPLAY_WRITES, REPLAY_ONLY_INDEX_WRITES);
+ put.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
+ BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
put.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
mutations.add(put);
// Since we're replaying existing mutations, it makes no sense to write them to the wal
@@ -1101,7 +1119,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
del = new Delete(CellUtil.cloneRow(cell));
del.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
- del.setAttribute(REPLAY_WRITES, REPLAY_ONLY_INDEX_WRITES);
+ del.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
+ BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
del.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
mutations.add(del);
// Since we're replaying existing mutations, it makes no sense to write them to the wal
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index ccefa6e..b39f64d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -40,7 +40,6 @@ import javax.annotation.concurrent.Immutable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -61,6 +60,7 @@ import org.apache.phoenix.index.IndexMetaDataCacheClient;
import org.apache.phoenix.index.PhoenixIndexBuilder;
import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
import org.apache.phoenix.index.PhoenixIndexFailurePolicy.MutateCommand;
+import org.apache.phoenix.index.PhoenixIndexMetaData;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
import org.apache.phoenix.monitoring.GlobalClientMetrics;
@@ -996,6 +996,11 @@ public class MutationState implements SQLCloseable {
throw new IOException(e);
}
}
+
+ @Override
+ public List<Mutation> getMutationList() {
+ return mutationBatch;
+ }
}, iwe, connection, connection.getQueryServices().getProps());
} else {
hTable.batch(mutationBatch, null);
@@ -1050,7 +1055,12 @@ public class MutationState implements SQLCloseable {
// For an index write failure, the data table write succeeded,
// so when we retry we need to set REPLAY_WRITES
for (Mutation m : mutationList) {
- m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES);
+ if (!PhoenixIndexMetaData.isIndexRebuild(
+ m.getAttributesMap())){
+ m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
+ BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES
+ );
+ }
PhoenixKeyValueUtil.setTimestamp(m, serverTimestamp);
}
shouldRetry = true;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 517d5d0..8c5184a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -18,10 +18,6 @@
package org.apache.phoenix.hbase.index;
import static org.apache.phoenix.hbase.index.util.IndexManagementUtil.rethrowIndexingException;
-import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_PAUSE;
-import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.DEFAULT_INDEX_WRITER_RPC_RETRIES_NUMBER;
-import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_PAUSE;
-import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_RETRIES_NUMBER;
import java.io.IOException;
import java.util.ArrayList;
@@ -55,8 +51,6 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.OperationStatus;
import org.apache.hadoop.hbase.regionserver.Region;
@@ -84,12 +78,10 @@ import org.apache.phoenix.hbase.index.write.IndexWriter;
import org.apache.phoenix.hbase.index.write.RecoveryIndexWriter;
import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache;
import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy;
-import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.trace.TracingUtils;
import org.apache.phoenix.trace.util.NullSpan;
import org.apache.phoenix.util.EnvironmentEdgeManager;
-import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.ServerUtil.ConnectionType;
@@ -453,7 +445,8 @@ public class Indexer implements RegionObserver, RegionCoprocessor {
}
// No need to write the table mutations when we're rebuilding
// the index as they're already written and just being replayed.
- if (replayWrite == ReplayWrite.INDEX_ONLY) {
+ if (replayWrite == ReplayWrite.INDEX_ONLY
+ || replayWrite == ReplayWrite.REBUILD_INDEX_ONLY) {
miniBatchOp.setOperationStatus(i, NOWRITE);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index 58468dc..4b6df89 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -32,9 +32,11 @@ import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
import org.apache.phoenix.hbase.index.Indexer;
import org.apache.phoenix.hbase.index.covered.IndexMetaData;
+import org.apache.phoenix.index.PhoenixIndexMetaData;
/**
* Manage the building of index updates from primary table updates.
@@ -88,6 +90,12 @@ public class IndexBuildManager implements Stoppable {
ArrayList<Pair<Mutation, byte[]>> results = new ArrayList<>(mutations.size());
for (Mutation m : mutations) {
Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData);
+ if (PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap())) {
+ for (Pair<Mutation, byte[]> update : updates) {
+ update.getFirst().setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
+ BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
+ }
+ }
results.addAll(updates);
}
return results;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index 7770070..f0379dd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -152,6 +152,24 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy {
public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException {
boolean throwing = true;
long timestamp = HConstants.LATEST_TIMESTAMP;
+ // we should check if failed list of mutation are part of Index Rebuilder or not.
+ // If its part of Index Rebuilder, we throw exception and do retries.
+ // If succeeds, we don't update Index State.
+ // Once those retries are exhausted, we transition Index to DISABLE
+ // It's being handled as part of PhoenixIndexFailurePolicy.doBatchWithRetries
+ Mutation checkMutationForRebuilder = attempted.entries().iterator().next().getValue();
+ boolean isIndexRebuild =
+ PhoenixIndexMetaData.isIndexRebuild(checkMutationForRebuilder.getAttributesMap());
+ if (isIndexRebuild) {
+ SQLException sqlException =
+ new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_WRITE_FAILURE)
+ .setRootCause(cause).setMessage(cause.getLocalizedMessage()).build()
+ .buildException();
+ IOException ioException = ServerUtil.wrapInDoNotRetryIOException(
+ "Retrying Index rebuild mutation, we will update Index state to DISABLE "
+ + "if all retries are exhusated", sqlException, timestamp);
+ throw ioException;
+ }
try {
timestamp = handleFailureWithExceptions(attempted, cause);
throwing = false;
@@ -166,10 +184,8 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy {
.setRootCause(cause).setMessage(cause.getLocalizedMessage()).build()
.buildException();
IOException ioException = ServerUtil.wrapInDoNotRetryIOException(null, sqlException, timestamp);
- Mutation m = attempted.entries().iterator().next().getValue();
- boolean isIndexRebuild = PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap());
- // Always throw if rebuilding index since the rebuilder needs to know if it was successful
- if (throwIndexWriteFailure || isIndexRebuild) {
+ // Here we throw index write failure to client so it can retry index mutation.
+ if (throwIndexWriteFailure) {
throw ioException;
} else {
LOG.warn("Swallowing index write failure", ioException);
@@ -427,6 +443,8 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy {
public static interface MutateCommand {
void doMutation() throws IOException;
+
+ List<Mutation> getMutationList();
}
/**
@@ -461,7 +479,11 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy {
Thread.sleep(ConnectionUtils.getPauseTime(pause, numRetry)); // HBase's exponential backoff
mutateCommand.doMutation();
// success - change the index state from PENDING_DISABLE back to ACTIVE
- handleIndexWriteSuccessFromClient(iwe, connection);
+ // If it's not Index Rebuild
+ if (!PhoenixIndexMetaData.isIndexRebuild(
+ mutateCommand.getMutationList().get(0).getAttributesMap())){
+ handleIndexWriteSuccessFromClient(iwe, connection);
+ }
return;
} catch (IOException e) {
SQLException inferredE = ServerUtil.parseLocalOrRemoteServerException(e);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
index 46f5b77..75ce9f4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
@@ -37,7 +37,8 @@ public class PhoenixIndexMetaData implements IndexMetaData {
private final boolean hasLocalIndexes;
public static boolean isIndexRebuild(Map<String,byte[]> attributes) {
- return attributes.get(BaseScannerRegionObserver.REPLAY_WRITES) != null;
+ return attributes.get(BaseScannerRegionObserver.REPLAY_WRITES)
+ == BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES;
}
public static ReplayWrite getReplayWrite(Map<String,byte[]> attributes) {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 13d2881..ab8cab1 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -123,6 +123,7 @@ import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory;
import org.apache.hadoop.hbase.master.HMaster;
@@ -1913,4 +1914,28 @@ public abstract class BaseTest {
Thread.sleep(100);
}
}
+
+ /**
+ * It always unassign first region of table.
+ * @param tableName move region of table.
+ * @throws IOException
+ */
+ protected static void unassignRegionAsync(final String tableName) throws IOException {
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ final Admin admin = utility.getAdmin();
+ final RegionInfo tableRegion =
+ admin.getRegions(TableName.valueOf(tableName)).get(0);
+ admin.unassign(tableRegion.getEncodedNameAsBytes(), false);
+ admin.assign(tableRegion.getEncodedNameAsBytes());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ thread.setDaemon(true);
+ thread.start();
+ }
}