You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2017/09/23 04:20:32 UTC
[2/3] phoenix git commit: PHOENIX-4178 Detect failed index write
while rebuilder is running with index staying active
PHOENIX-4178 Detect failed index write while rebuilder is running with index staying active
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/944bed73
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/944bed73
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/944bed73
Branch: refs/heads/master
Commit: 944bed73585a5ff826997895c2da43720b229d8a
Parents: 0a1b33a
Author: James Taylor <jt...@salesforce.com>
Authored: Thu Sep 21 18:24:39 2017 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Fri Sep 22 21:18:17 2017 -0700
----------------------------------------------------------------------
.../end2end/index/MutableIndexFailureIT.java | 12 +-
.../end2end/index/PartialIndexRebuilderIT.java | 459 +++++++++++--------
.../coprocessor/MetaDataEndpointImpl.java | 179 ++++----
.../phoenix/coprocessor/MetaDataProtocol.java | 6 +-
.../coprocessor/MetaDataRegionObserver.java | 74 +--
.../apache/phoenix/execute/MutationState.java | 10 +-
.../apache/phoenix/index/IndexMaintainer.java | 25 +-
.../index/PhoenixIndexFailurePolicy.java | 12 +-
.../phoenix/jdbc/PhoenixDatabaseMetaData.java | 1 +
.../apache/phoenix/optimize/QueryOptimizer.java | 6 +-
.../phoenix/query/QueryServicesOptions.java | 16 +-
.../org/apache/phoenix/schema/PIndexState.java | 3 +-
.../java/org/apache/phoenix/util/IndexUtil.java | 14 +-
.../phoenix/query/QueryServicesTestImpl.java | 10 +-
.../java/org/apache/phoenix/util/TestUtil.java | 42 +-
15 files changed, 496 insertions(+), 373 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/944bed73/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index dbac5a9..6617466 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -35,7 +36,6 @@ import java.util.Properties;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -137,12 +137,12 @@ public class MutableIndexFailureIT extends BaseTest {
// need to override rpc retries otherwise test doesn't pass
serverProps.put(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER, Long.toString(numRpcRetries));
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB, Long.toString(forwardOverlapMs));
+ serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, Long.toString(disableTimestampThresholdMs));
/*
* Effectively disable running the index rebuild task by having an infinite delay
* because we want to control it's execution ourselves
*/
serverProps.put(QueryServices.INDEX_REBUILD_TASK_INITIAL_DELAY, Long.toString(Long.MAX_VALUE));
- serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, Long.toString(disableTimestampThresholdMs));
Map<String, String> clientProps = Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
NUM_SLAVES_BASE = 4;
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
@@ -296,10 +296,10 @@ public class MutableIndexFailureIT extends BaseTest {
assertTrue(rs.next());
assertEquals(indexName, rs.getString(3));
// the index is only disabled for non-txn tables upon index table write failure
+ String indexState = rs.getString("INDEX_STATE");
if (transactional || leaveIndexActiveOnFailure || localIndex) {
- assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
+ assertTrue(PIndexState.ACTIVE.toString().equalsIgnoreCase(indexState) || PIndexState.PENDING_ACTIVE.toString().equalsIgnoreCase(indexState));
} else {
- String indexState = rs.getString("INDEX_STATE");
assertTrue(PIndexState.DISABLE.toString().equals(indexState) || PIndexState.INACTIVE.toString().equals(indexState));
}
assertFalse(rs.next());
@@ -308,7 +308,7 @@ public class MutableIndexFailureIT extends BaseTest {
// in an all or none manner. If the table is not transactional, then the data writes
// would have succeeded while the index writes would have failed.
if (!transactional) {
- updateTableAgain(conn, leaveIndexActiveOnFailure);
+ updateTableAgain(conn, false);
// Verify previous writes succeeded to data table
query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
rs = conn.createStatement().executeQuery("EXPLAIN " + query);
@@ -496,7 +496,7 @@ public class MutableIndexFailureIT extends BaseTest {
public static final String FAIL_TABLE_NAME = "FAIL_TABLE";
@Override
- public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
+ public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
boolean throwException = false;
if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().endsWith("A_" + FAIL_INDEX_NAME)
&& FAIL_WRITE) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/944bed73/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
index dfe5a28..9095dbe 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Map;
import java.util.Random;
@@ -32,7 +33,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
@@ -40,11 +41,13 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.MetaDataRegionObserver;
+import org.apache.phoenix.coprocessor.MetaDataRegionObserver.BuildIndexScheduleTask;
import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
+import org.apache.phoenix.execute.CommitException;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PMetaData;
@@ -64,23 +67,68 @@ import org.apache.phoenix.util.TestUtil;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
+@SuppressWarnings("deprecation")
@RunWith(RunUntilFailure.class)
public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
+ private static final Logger LOG = LoggerFactory.getLogger(PartialIndexRebuilderIT.class);
private static final Random RAND = new Random(5);
private static final int WAIT_AFTER_DISABLED = 5000;
- private static final int REBUILD_INTERVAL = 2000;
+ private static final long REBUILD_PERIOD = 50000;
+ private static final long REBUILD_INTERVAL = 2000;
+ private static RegionCoprocessorEnvironment indexRebuildTaskRegionEnvironment;
+
@BeforeClass
public static void doSetup() throws Exception {
Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, Boolean.TRUE.toString());
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, Long.toString(REBUILD_INTERVAL));
- serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, "120000"); // give up rebuilding after 2 minutes
+ serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, "300000"); // give up rebuilding after 5 minutes
+ serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, Long.toString(REBUILD_PERIOD)); // batch at 50 seconds
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB, Long.toString(WAIT_AFTER_DISABLED));
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), ReadOnlyProps.EMPTY_PROPS);
+ indexRebuildTaskRegionEnvironment =
+ (RegionCoprocessorEnvironment) getUtility()
+ .getRSForFirstRegionInTable(
+ PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
+ .getOnlineRegions(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
+ .get(0).getCoprocessorHost()
+ .findCoprocessorEnvironment(MetaDataRegionObserver.class.getName());
+ MetaDataRegionObserver.initRebuildIndexConnectionProps(
+ indexRebuildTaskRegionEnvironment.getConfiguration());
+ }
+
+ private static void runIndexRebuilder() throws InterruptedException, SQLException {
+ BuildIndexScheduleTask task =
+ new MetaDataRegionObserver.BuildIndexScheduleTask(
+ indexRebuildTaskRegionEnvironment);
+ task.run();
+ }
+
+ private static void runIndexRebuilderAsync(final int interval, final boolean[] cancel) {
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (!cancel[0]) {
+ try {
+ runIndexRebuilder();
+ Thread.sleep(interval);
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new RuntimeException(e);
+ } catch (SQLException e) {
+ LOG.error(e.getMessage(),e);
+ }
+ }
+ }
+ });
+ thread.setDaemon(true);
+ thread.start();
}
private static void mutateRandomly(final String fullTableName, final int nThreads, final int nRows, final int nIndexValues, final int batchSize, final CountDownLatch doneSignal) {
@@ -134,7 +182,6 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
}
}
@Test
- @Repeat(5)
public void testConcurrentUpsertsWithRebuild() throws Throwable {
int nThreads = 5;
final int batchSize = 200;
@@ -155,12 +202,17 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
assertTrue("Ran out of time", doneSignal1.await(120, TimeUnit.SECONDS));
IndexUtil.updateIndexState(fullIndexName, EnvironmentEdgeManager.currentTimeMillis(), metaTable, PIndexState.DISABLE);
- do {
- final CountDownLatch doneSignal2 = new CountDownLatch(nThreads);
- mutateRandomly(fullTableName, nThreads, nRows, nIndexValues, batchSize, doneSignal2);
- assertTrue("Ran out of time", doneSignal2.await(500, TimeUnit.SECONDS));
- } while (!TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
-
+ boolean[] cancel = new boolean[1];
+ try {
+ do {
+ final CountDownLatch doneSignal2 = new CountDownLatch(nThreads);
+ runIndexRebuilderAsync(500,cancel);
+ mutateRandomly(fullTableName, nThreads, nRows, nIndexValues, batchSize, doneSignal2);
+ assertTrue("Ran out of time", doneSignal2.await(500, TimeUnit.SECONDS));
+ } while (!TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
+ } finally {
+ cancel[0] = true;
+ }
long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
assertEquals(nRows, actualRowCount);
}
@@ -172,17 +224,21 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
private static boolean hasInactiveIndex(PMetaData metaCache, PTableKey key) throws TableNotFoundException {
PTable table = metaCache.getTableRef(key).getTable();
for (PTable index : table.getIndexes()) {
- if (index.getIndexState() == PIndexState.ACTIVE) {
+ if (index.getIndexState() == PIndexState.INACTIVE) {
return true;
}
}
return false;
}
-
+
private static boolean hasDisabledIndex(PMetaData metaCache, PTableKey key) throws TableNotFoundException {
+ return hasIndexWithState(metaCache, key, PIndexState.DISABLE);
+ }
+
+ private static boolean hasIndexWithState(PMetaData metaCache, PTableKey key, PIndexState expectedState) throws TableNotFoundException {
PTable table = metaCache.getTableRef(key).getTable();
for (PTable index : table.getIndexes()) {
- if (index.getIndexState() == PIndexState.DISABLE) {
+ if (index.getIndexState() == expectedState) {
return true;
}
}
@@ -304,8 +360,14 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
long disableTS = EnvironmentEdgeManager.currentTimeMillis();
HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE);
- mutateRandomly(conn, fullTableName, nRows);
- TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+ boolean[] cancel = new boolean[1];
+ try {
+ runIndexRebuilderAsync(500,cancel);
+ mutateRandomly(conn, fullTableName, nRows);
+ TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+ } finally {
+ cancel[0] = true;
+ }
long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
assertEquals(nRows,actualRowCount);
@@ -346,12 +408,16 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
t.start();
long disableTS = EnvironmentEdgeManager.currentTimeMillis();
IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE);
- TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
- doneSignal.await(60, TimeUnit.SECONDS);
+ boolean[] cancel = new boolean[1];
+ try {
+ runIndexRebuilderAsync(500,cancel);
+ TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+ doneSignal.await(60, TimeUnit.SECONDS);
+ } finally {
+ cancel[0] = true;
+ }
assertTrue(hasInactiveIndex[0]);
- TestUtil.dumpIndexStatus(conn, fullIndexName);
-
long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
assertEquals(nRows,actualRowCount);
@@ -379,7 +445,10 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
conn.commit();
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','eeeee')");
conn.commit();
- TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+ runIndexRebuilder();
+ Thread.sleep(WAIT_AFTER_DISABLED);
+ runIndexRebuilder();
+ assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
}
@@ -406,7 +475,10 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
conn.commit();
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')");
conn.commit();
- TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+ runIndexRebuilder();
+ Thread.sleep(WAIT_AFTER_DISABLED);
+ runIndexRebuilder();
+ assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
}
@@ -433,7 +505,10 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
conn.commit();
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a',null)");
conn.commit();
- TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+ runIndexRebuilder();
+ Thread.sleep(WAIT_AFTER_DISABLED);
+ runIndexRebuilder();
+ assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
}
@@ -458,7 +533,10 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
conn.commit();
conn.createStatement().execute("DELETE FROM " + fullTableName);
conn.commit();
- TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+ runIndexRebuilder();
+ Thread.sleep(WAIT_AFTER_DISABLED);
+ runIndexRebuilder();
+ assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
}
@@ -483,7 +561,10 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
IndexUtil.updateIndexState(fullIndexName, disableTS, metaTable, PIndexState.DISABLE);
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','b')");
conn.commit();
- TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+ runIndexRebuilder();
+ Thread.sleep(WAIT_AFTER_DISABLED);
+ runIndexRebuilder();
+ assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
}
@@ -526,9 +607,11 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')");
conn.commit();
clock.time += 1000;
- advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
- TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
- clock.time += 100;
+ runIndexRebuilder();
+ assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.INACTIVE, null));
+ clock.time += WAIT_AFTER_DISABLED;
+ runIndexRebuilder();
+ assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
} finally {
EnvironmentEdgeManager.injectEdge(null);
@@ -536,7 +619,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
}
@Test
- public void testSeparateTimeBatchesRequired() throws Throwable {
+ public void testTimeBatchesInCoprocessorRequired() throws Throwable {
String schemaName = generateUniqueName();
String tableName = generateUniqueName();
String indexName = generateUniqueName();
@@ -572,9 +655,11 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
clock.time += 100;
IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE);
clock.time += 100;
- advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
- TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
- clock.time += 100;
+ runIndexRebuilder();
+ assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.INACTIVE, null));
+ clock.time += WAIT_AFTER_DISABLED;
+ runIndexRebuilder();
+ assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
} finally {
EnvironmentEdgeManager.injectEdge(null);
@@ -582,7 +667,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
}
@Test
- public void testMultiValuesWhenDisableAndInactive() throws Throwable {
+ public void testBatchingDuringRebuild() throws Throwable {
String schemaName = generateUniqueName();
String tableName = generateUniqueName();
String indexName = generateUniqueName();
@@ -593,55 +678,45 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
EnvironmentEdgeManager.injectEdge(clock);
try (Connection conn = DriverManager.getConnection(getUrl())) {
PMetaData metaCache = conn.unwrap(PhoenixConnection.class).getMetaDataCache();
- conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, v3 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
+ conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true");
clock.time += 100;
- conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2) INCLUDE (v3)");
+ conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2)");
clock.time += 100;
- conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0','x')");
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0')");
conn.commit();
clock.time += 100;
- try (HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)) {
- // By using an INDEX_DISABLE_TIMESTAMP of 0, we prevent the partial index rebuilder from triggering
- IndexUtil.updateIndexState(fullIndexName, 0L, metaTable, PIndexState.DISABLE);
- clock.time += 100;
- long disableTime = clock.currentTime();
- // Set some values while index disabled
- conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bb', '11','yy')");
- conn.commit();
- clock.time += 100;
- assertTrue(hasDisabledIndex(metaCache, key));
- conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','222','zzz')");
- conn.commit();
- clock.time += 100;
- conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','dddd','3333','zzzz')");
- conn.commit();
- clock.time += 100;
- // Will cause partial index rebuilder to be triggered
- IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE);
- }
- final CountDownLatch doneSignal = new CountDownLatch(1);
- advanceClockUntilPartialRebuildStarts(fullIndexName, clock, doneSignal);
- doneSignal.await(30, TimeUnit.SECONDS);
- // Set some values while index is in INACTIVE state
+ HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+ long disableTime = clock.currentTime();
+ IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE);
clock.time += 100;
- conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','eeeee','44444','zzzzz')");
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('bb','bb', '11')");
conn.commit();
- clock.time += 100;
- conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','fffff','55555','zzzzzz')");
+ clock.time += REBUILD_PERIOD;
+ assertTrue(hasDisabledIndex(metaCache, key));
+ assertEquals(2,TestUtil.getRowCount(conn, fullTableName));
+ assertEquals(1,TestUtil.getRowCount(conn, fullIndexName));
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('ccc','ccc','222')");
conn.commit();
- clock.time += WAIT_AFTER_DISABLED;
- // Enough time has passed, so rebuild will start now
- TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+ assertEquals(3,TestUtil.getRowCount(conn, fullTableName));
+ assertEquals(1,TestUtil.getRowCount(conn, fullIndexName));
clock.time += 100;
+
+ runIndexRebuilder();
+ assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.INACTIVE, null));
+ clock.time += WAIT_AFTER_DISABLED;
+ runIndexRebuilder();
+ assertEquals(2,TestUtil.getRowCount(conn, fullIndexName));
+
+ clock.time += REBUILD_PERIOD;
+ runIndexRebuilder();
+ // Verify that other batches were processed
+ assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
} finally {
EnvironmentEdgeManager.injectEdge(null);
}
}
-
- private final static CountDownLatch WAIT_FOR_REBUILD_TO_START = new CountDownLatch(1);
- private final static CountDownLatch WAIT_FOR_INDEX_WRITE = new CountDownLatch(1);
-
+
@Test
public void testUpperBoundSetOnRebuild() throws Throwable {
String schemaName = generateUniqueName();
@@ -665,7 +740,7 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a', '0')");
conn.commit();
// Set clock forward in time past the "overlap" amount we wait for index maintenance to kick in
- clock.time += 10 * WAIT_AFTER_DISABLED;
+ clock.time += 2 * WAIT_AFTER_DISABLED;
assertTrue(hasDisabledIndex(metaCache, key));
assertEquals(1,TestUtil.getRowCount(conn, fullTableName));
assertEquals(0,TestUtil.getRowCount(conn, fullIndexName));
@@ -676,9 +751,11 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
// Set clock back in time and start rebuild
clock.time = disableTime + 100;
IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE);
- advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
- clock.time += REBUILD_INTERVAL;
- waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE, clock, REBUILD_INTERVAL);
+ runIndexRebuilder();
+ assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.INACTIVE, null));
+ clock.time += WAIT_AFTER_DISABLED;
+ runIndexRebuilder();
+ assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
assertEquals(2,TestUtil.getRowCount(conn, fullTableName));
// If an upper bound was set on the rebuilder, we should only have found one row
assertEquals(1,TestUtil.getRowCount(conn, fullIndexName));
@@ -687,21 +764,8 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
}
}
- private static void waitForIndexRebuild(Connection conn, String fullIndexName, PIndexState expectedIndexState, MyClock clock, long increment) throws InterruptedException, SQLException {
- int maxTries = 60, nTries = 0;
- do {
- Thread.sleep(1000); // sleep 1 sec
- clock.time += increment;
- if (TestUtil.checkIndexState(conn, fullIndexName, expectedIndexState, 0L)) {
- return;
- }
- } while (++nTries < maxTries);
- fail("Ran out of time waiting for index state to become " + expectedIndexState);
- }
-
-
@Test
- public void testDisableIndexDuringRebuild() throws Throwable {
+ public void testMultiValuesWhenDisableAndInactive() throws Throwable {
String schemaName = generateUniqueName();
String tableName = generateUniqueName();
String indexName = generateUniqueName();
@@ -737,47 +801,21 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
clock.time += 100;
// Will cause partial index rebuilder to be triggered
IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE);
- final CountDownLatch doneSignal = new CountDownLatch(1);
- advanceClockUntilPartialRebuildStarts(fullIndexName, clock, doneSignal);
- // Set some values while index is in INACTIVE state
- clock.time += 100;
- conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','eeeee','44444','zzzzz')");
- conn.commit();
- clock.time += 100;
- conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','fffff','55555','zzzzzz')");
- conn.commit();
- doneSignal.await(30, TimeUnit.SECONDS);
- // Install coprocessor that will simulate an index write failure during index rebuild
- TestUtil.addCoprocessor(conn,fullIndexName,WriteFailingRegionObserver.class);
- clock.time += WAIT_AFTER_DISABLED;
- doneSignal.await(30, TimeUnit.SECONDS);
- WAIT_FOR_REBUILD_TO_START.await(30, TimeUnit.SECONDS);
- // By using an INDEX_DISABLE_TIMESTAMP of 0, we prevent the partial index rebuilder from triggering
- IndexUtil.updateIndexState(fullIndexName, 0L, metaTable, PIndexState.DISABLE);
- clock.time += 100;
- disableTime = clock.currentTime();
- // Set some values while index disabled
- conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('b','bbbbb', '11','yy')");
- conn.commit();
- clock.time += 100;
- conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','cccccc','222','zzz')");
- conn.commit();
- clock.time += 100;
- conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ddddddd','3333','zzzz')");
- conn.commit();
- clock.time += 100;
- // Simulates another write failure. Should cause current run of rebuilder to fail and retry again later
- IndexUtil.updateIndexState(fullIndexName, disableTime, metaTable, PIndexState.DISABLE);
- removeWriteFailingCoprocessor(conn,fullIndexName);
- WAIT_FOR_INDEX_WRITE.countDown();
}
- // Original rebuilder should have failed
-
- advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
- clock.time += WAIT_AFTER_DISABLED * 2;
- // Enough time has passed, so rebuild will start now
- TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
+ runIndexRebuilder();
+ assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.INACTIVE, null));
+
+ // Set some values while index is in INACTIVE state
clock.time += 100;
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','eeeee','44444','zzzzz')");
+ conn.commit();
+ clock.time += 100;
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','fffff','55555','zzzzzz')");
+ conn.commit();
+ clock.time += WAIT_AFTER_DISABLED;
+ // Enough time has passed, so rebuild will start now
+ runIndexRebuilder();
+ assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
} finally {
EnvironmentEdgeManager.injectEdge(null);
@@ -785,6 +823,100 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
}
@Test
+ public void testIndexWriteFailureDisablingIndex() throws Throwable {
+ testIndexWriteFailureDuringRebuild(PIndexState.DISABLE);
+ }
+
+ @Test
+ public void testIndexWriteFailureLeavingIndexActive() throws Throwable {
+ testIndexWriteFailureDuringRebuild(PIndexState.PENDING_ACTIVE);
+ }
+
+ private void testIndexWriteFailureDuringRebuild(PIndexState indexStateOnFailure) throws Throwable {
+ String schemaName = generateUniqueName();
+ String tableName = generateUniqueName();
+ String indexName = generateUniqueName();
+ final String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+ final String fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+ PTableKey key = new PTableKey(null,fullTableName);
+ final MyClock clock = new MyClock(1000);
+ EnvironmentEdgeManager.injectEdge(clock);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ PMetaData metaCache = conn.unwrap(PhoenixConnection.class).getMetaDataCache();
+ conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR, v3 VARCHAR) COLUMN_ENCODED_BYTES = 0, DISABLE_INDEX_ON_WRITE_FAILURE = " + (indexStateOnFailure == PIndexState.DISABLE));
+ clock.time += 100;
+ conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + fullTableName + " (v1, v2)");
+ clock.time += 100;
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','a','0')");
+ conn.commit();
+ clock.time += 100;
+ HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+
+ long disableTime = clock.currentTime();
+ // Simulates an index write failure
+ IndexUtil.updateIndexState(fullIndexName, indexStateOnFailure == PIndexState.DISABLE ? disableTime : -disableTime, metaTable, indexStateOnFailure);
+
+ clock.time += 100;
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('bb','bb', '11')");
+ conn.commit();
+
+ // Large enough to be in separate time batch
+ clock.time += 2 * REBUILD_PERIOD;
+ assertTrue(hasIndexWithState(metaCache, key, indexStateOnFailure));
+ assertEquals(2,TestUtil.getRowCount(conn, fullTableName));
+ assertEquals(1,TestUtil.getRowCount(conn, fullIndexName));
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('ccc','ccc','222')");
+ conn.commit();
+ assertEquals(3,TestUtil.getRowCount(conn, fullTableName));
+ assertEquals(1,TestUtil.getRowCount(conn, fullIndexName));
+ clock.time += 100;
+
+ runIndexRebuilder();
+ assertTrue(TestUtil.checkIndexState(conn, fullIndexName, indexStateOnFailure == PIndexState.DISABLE ? PIndexState.INACTIVE : PIndexState.ACTIVE, null));
+ clock.time += WAIT_AFTER_DISABLED;
+
+ // First batch should have been processed
+ runIndexRebuilder();
+ assertEquals(2,TestUtil.getRowCount(conn, fullIndexName));
+
+ // Simulate write failure
+ TestUtil.addCoprocessor(conn, fullIndexName, WriteFailingRegionObserver.class);
+ conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('dddd','dddd','3333')");
+ try {
+ conn.commit();
+ fail();
+ } catch (CommitException e) {
+ // Expected
+ }
+ assertTrue(TestUtil.checkIndexState(conn, fullIndexName, indexStateOnFailure, null));
+ PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class);
+ ResultSet rs = stmt.executeQuery("SELECT V2 FROM " + fullTableName + " WHERE V1 = 'a'");
+ assertTrue(rs.next());
+ assertEquals("0", rs.getString(1));
+ assertEquals(indexStateOnFailure == PIndexState.DISABLE ? fullTableName : fullIndexName, stmt.getQueryPlan().getContext().getCurrentTable().getTable().getName().getString());
+ TestUtil.removeCoprocessor(conn, fullIndexName, WriteFailingRegionObserver.class);
+
+ runIndexRebuilder();
+ assertTrue(TestUtil.checkIndexState(conn, fullIndexName, indexStateOnFailure == PIndexState.DISABLE ? PIndexState.INACTIVE : PIndexState.ACTIVE, null));
+ clock.time += WAIT_AFTER_DISABLED;
+
+ // First batch should have been processed again because we started over
+ runIndexRebuilder();
+ assertEquals(3,TestUtil.getRowCount(conn, fullIndexName));
+
+ clock.time += 2 * REBUILD_PERIOD;
+ // Second batch should have been processed now
+ runIndexRebuilder();
+ assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
+
+ // Verify that other batches were processed
+ IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
+ } finally {
+ EnvironmentEdgeManager.injectEdge(null);
+ }
+ }
+
+ @Test
public void testDeleteAndUpsertValuesAtSameTS1() throws Throwable {
String schemaName = generateUniqueName();
String tableName = generateUniqueName();
@@ -808,9 +940,11 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc')");
conn.commit();
clock.time += 1000;
- advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
- TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
- clock.time += 100;
+ runIndexRebuilder();
+ assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.INACTIVE, null));
+ clock.time += WAIT_AFTER_DISABLED;
+ runIndexRebuilder();
+ assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
} finally {
EnvironmentEdgeManager.injectEdge(null);
@@ -841,9 +975,11 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
conn.createStatement().execute("DELETE FROM " + fullTableName + " WHERE k='a'");
conn.commit();
clock.time += 1000;
- advanceClockUntilPartialRebuildStarts(fullIndexName, clock);
- TestUtil.waitForIndexRebuild(conn, fullIndexName, PIndexState.ACTIVE);
- clock.time += 100;
+ runIndexRebuilder();
+ assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.INACTIVE, null));
+ clock.time += WAIT_AFTER_DISABLED;
+ runIndexRebuilder();
+ assertTrue(TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L));
IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
} finally {
EnvironmentEdgeManager.injectEdge(null);
@@ -873,65 +1009,10 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT {
}
}
- private static void advanceClockUntilPartialRebuildStarts(final String fullIndexName, final MyClock clock) throws InterruptedException {
- final CountDownLatch doneSignal = new CountDownLatch(1);
- advanceClockUntilPartialRebuildStarts(fullIndexName, clock, doneSignal);
- clock.time += WAIT_AFTER_DISABLED + 1000;
- doneSignal.await(30, TimeUnit.SECONDS);
- }
-
- private static void advanceClockUntilPartialRebuildStarts(final String fullIndexName, final MyClock clock, final CountDownLatch doneSignal) {
- Runnable r = new Runnable() {
- @Override
- public void run() {
- try (Connection conn = DriverManager.getConnection(getUrl())) {
- int nTries = 10;
- while (--nTries >0 && !TestUtil.checkIndexState(conn, fullIndexName, PIndexState.INACTIVE, null)) {
- Thread.sleep(1000);
- clock.time += 1000;
- }
- doneSignal.countDown();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- };
- Thread t = new Thread(r);
- t.setDaemon(true);
- t.start();
- }
-
- private static void removeWriteFailingCoprocessor(Connection conn, String tableName) throws Exception {
- ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
- HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName));
- descriptor.removeCoprocessor(WriteFailingRegionObserver.class.getName());
- int numTries = 10;
- try (HBaseAdmin admin = services.getAdmin()) {
- admin.modifyTable(Bytes.toBytes(tableName), descriptor);
- while (!admin.getTableDescriptor(Bytes.toBytes(tableName)).equals(descriptor)
- && numTries > 0) {
- numTries--;
- if (numTries == 0) {
- throw new Exception(
- "Check to detect if delaying co-processor was removed failed after "
- + numTries + " retries.");
- }
- Thread.sleep(1000);
- }
- }
- }
-
public static class WriteFailingRegionObserver extends SimpleRegionObserver {
@Override
public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
- WAIT_FOR_REBUILD_TO_START.countDown();
- try {
- WAIT_FOR_INDEX_WRITE.await(30, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- Thread.interrupted();
- throw new IOException(e);
- }
+ throw new DoNotRetryIOException("Simulating write failure on " + c.getEnvironment().getRegionInfo().getTable().getNameAsString());
}
}
-
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/944bed73/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 5131a77..80b0785 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -515,7 +515,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
long currentTime = EnvironmentEdgeManager.currentTimeMillis();
- PTable table = doGetTable(key, request.getClientTimestamp());
+ PTable table = doGetTable(key, request.getClientTimestamp(), request.getClientVersion());
if (table == null) {
builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
builder.setMutationTime(currentTime);
@@ -527,7 +527,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
long minNonZerodisableIndexTimestamp = disableIndexTimestamp > 0 ? disableIndexTimestamp : Long.MAX_VALUE;
for (PTable index : table.getIndexes()) {
disableIndexTimestamp = index.getIndexDisableTimestamp();
- if (disableIndexTimestamp > 0 && index.getIndexState() == PIndexState.ACTIVE && disableIndexTimestamp < minNonZerodisableIndexTimestamp) {
+ if (disableIndexTimestamp > 0 && (index.getIndexState() == PIndexState.ACTIVE || index.getIndexState() == PIndexState.PENDING_ACTIVE) && disableIndexTimestamp < minNonZerodisableIndexTimestamp) {
minNonZerodisableIndexTimestamp = disableIndexTimestamp;
}
}
@@ -554,7 +554,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
private PTable buildTable(byte[] key, ImmutableBytesPtr cacheKey, Region region,
- long clientTimeStamp) throws IOException, SQLException {
+ long clientTimeStamp, int clientVersion) throws IOException, SQLException {
Scan scan = MetaDataUtil.newTableRowsScan(key, MIN_TABLE_TIMESTAMP, clientTimeStamp);
Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
try (RegionScanner scanner = region.getScanner(scan)) {
@@ -563,7 +563,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
PTable newTable;
boolean blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE,
QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
- newTable = getTable(scanner, clientTimeStamp, tableTimeStamp);
+ newTable = getTable(scanner, clientTimeStamp, tableTimeStamp, clientVersion);
if (newTable == null) {
return null;
}
@@ -646,9 +646,9 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
}
- private void addIndexToTable(PName tenantId, PName schemaName, PName indexName, PName tableName, long clientTimeStamp, List<PTable> indexes) throws IOException, SQLException {
+ private void addIndexToTable(PName tenantId, PName schemaName, PName indexName, PName tableName, long clientTimeStamp, List<PTable> indexes, int clientVersion) throws IOException, SQLException {
byte[] key = SchemaUtil.getTableKey(tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes(), schemaName.getBytes(), indexName.getBytes());
- PTable indexTable = doGetTable(key, clientTimeStamp);
+ PTable indexTable = doGetTable(key, clientTimeStamp, clientVersion);
if (indexTable == null) {
ServerUtil.throwIOException("Index not found", new TableNotFoundException(schemaName.getString(), indexName.getString()));
return;
@@ -795,7 +795,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
arguments.add(arg);
}
- private PTable getTable(RegionScanner scanner, long clientTimeStamp, long tableTimeStamp)
+ private PTable getTable(RegionScanner scanner, long clientTimeStamp, long tableTimeStamp, int clientVersion)
throws IOException, SQLException {
List<Cell> results = Lists.newArrayList();
scanner.next(results);
@@ -896,6 +896,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
PIndexState indexState =
indexStateKv == null ? null : PIndexState.fromSerializedValue(indexStateKv
.getValueArray()[indexStateKv.getValueOffset()]);
+ // If client is not yet up to 4.12, then translate PENDING_ACTIVE to ACTIVE (as would have been
+ // the value in those versions) since the client won't have this index state in its enum.
+ if (indexState == PIndexState.PENDING_ACTIVE && clientVersion < PhoenixDatabaseMetaData.MIN_PENDING_ACTIVE_INDEX) {
+ indexState = PIndexState.ACTIVE;
+ }
Cell immutableRowsKv = tableKeyValues[IMMUTABLE_ROWS_INDEX];
boolean isImmutableRows =
immutableRowsKv == null ? false : (Boolean) PBoolean.INSTANCE.toObject(
@@ -981,7 +986,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
} else if (Bytes.compareTo(LINK_TYPE_BYTES, 0, LINK_TYPE_BYTES.length, colKv.getQualifierArray(), colKv.getQualifierOffset(), colKv.getQualifierLength())==0) {
LinkType linkType = LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]);
if (linkType == LinkType.INDEX_TABLE) {
- addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes);
+ addIndexToTable(tenantId, schemaName, famName, tableName, clientTimeStamp, indexes, clientVersion);
} else if (linkType == LinkType.PHYSICAL_TABLE) {
physicalTables.add(famName);
} else if (linkType == LinkType.PARENT_TABLE) {
@@ -1247,13 +1252,13 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
private PTable loadTable(RegionCoprocessorEnvironment env, byte[] key,
- ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp)
+ ImmutableBytesPtr cacheKey, long clientTimeStamp, long asOfTimeStamp, int clientVersion)
throws IOException, SQLException {
Region region = env.getRegion();
Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache = GlobalCache.getInstance(this.env).getMetaDataCache();
PTable table = (PTable)metaDataCache.getIfPresent(cacheKey);
// We always cache the latest version - fault in if not in cache
- if (table != null || (table = buildTable(key, cacheKey, region, asOfTimeStamp)) != null) {
+ if (table != null || (table = buildTable(key, cacheKey, region, asOfTimeStamp, clientVersion)) != null) {
return table;
}
// if not found then check if newer table already exists and add delete marker for timestamp
@@ -1358,6 +1363,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
byte[] schemaName = null;
byte[] tableName = null;
try {
+ int clientVersion = request.getClientVersion();
List<Mutation> tableMetadata = ProtobufUtil.getMutations(request);
MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
byte[] tenantIdBytes = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
@@ -1419,7 +1425,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
acquireLock(region, parentTableKey, locks);
parentCacheKey = new ImmutableBytesPtr(parentTableKey);
parentTable = loadTable(env, parentTableKey, parentCacheKey, clientTimeStamp,
- clientTimeStamp);
+ clientTimeStamp, clientVersion);
if (parentTable == null || isTableDeleted(parentTable)) {
builder.setReturnCode(MetaDataProtos.MutationCode.PARENT_TABLE_NOT_FOUND);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
@@ -1452,7 +1458,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
// Get as of latest timestamp so we can detect if we have a newer table that already
// exists without making an additional query
PTable table =
- loadTable(env, tableKey, cacheKey, clientTimeStamp, HConstants.LATEST_TIMESTAMP);
+ loadTable(env, tableKey, cacheKey, clientTimeStamp, HConstants.LATEST_TIMESTAMP, clientVersion);
if (table != null) {
if (table.getTimeStamp() < clientTimeStamp) {
// If the table is older than the client time stamp and it's deleted,
@@ -1641,8 +1647,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
private void findAllChildViews(Region region, byte[] tenantId, PTable table,
- TableViewFinder result, long clientTimeStamp) throws IOException, SQLException {
- TableViewFinder currResult = findChildViews(region, tenantId, table);
+ TableViewFinder result, long clientTimeStamp, int clientVersion) throws IOException, SQLException {
+ TableViewFinder currResult = findChildViews(region, tenantId, table, clientVersion);
result.addResult(currResult);
for (ViewInfo viewInfo : currResult.getViewInfoList()) {
byte[] viewtenantId = viewInfo.getTenantId();
@@ -1650,8 +1656,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
byte[] viewTable = viewInfo.getViewName();
byte[] tableKey = SchemaUtil.getTableKey(viewtenantId, viewSchema, viewTable);
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(tableKey);
- PTable view = loadTable(env, tableKey, cacheKey, clientTimeStamp, clientTimeStamp);
- findAllChildViews(region, viewtenantId, view, result, clientTimeStamp);
+ PTable view = loadTable(env, tableKey, cacheKey, clientTimeStamp, clientTimeStamp, clientVersion);
+ findAllChildViews(region, viewtenantId, view, result, clientTimeStamp, clientVersion);
}
}
@@ -1775,7 +1781,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
private static final byte[] PHYSICAL_TABLE_BYTES =
new byte[] { PTable.LinkType.PHYSICAL_TABLE.getSerializedValue() };
- private TableViewFinder findChildViews(Region region, byte[] tenantId, PTable table)
+ private TableViewFinder findChildViews(Region region, byte[] tenantId, PTable table, int clientVersion)
throws IOException, SQLException {
byte[] tableKey =
SchemaUtil.getTableKey(ByteUtil.EMPTY_BYTE_ARRAY,
@@ -1784,7 +1790,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(tableKey);
PTable systemCatalog =
loadTable(env, tableKey, cacheKey, MIN_SYSTEM_TABLE_TIMESTAMP,
- HConstants.LATEST_TIMESTAMP);
+ HConstants.LATEST_TIMESTAMP, clientVersion);
if (systemCatalog.getTimeStamp() < MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0) {
return findChildViews_deprecated(region, tenantId, table, PHYSICAL_TABLE_BYTES);
} else {
@@ -1842,7 +1848,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
result =
doDropTable(key, tenantIdBytes, schemaName, tableName, parentTableName,
PTableType.fromSerializedValue(tableType), tableMetadata,
- invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, isCascade);
+ invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, isCascade, request.getClientVersion());
if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
done.run(MetaDataMutationResult.toProto(result));
return;
@@ -1874,7 +1880,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
private MetaDataMutationResult doDropTable(byte[] key, byte[] tenantId, byte[] schemaName,
byte[] tableName, byte[] parentTableName, PTableType tableType, List<Mutation> rowsToDelete,
List<ImmutableBytesPtr> invalidateList, List<RowLock> locks,
- List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete, boolean isCascade) throws IOException, SQLException {
+ List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete, boolean isCascade, int clientVersion) throws IOException, SQLException {
long clientTimeStamp = MetaDataUtil.getClientTimeStamp(rowsToDelete);
@@ -1887,7 +1893,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
// We always cache the latest version - fault in if not in cache
if (table != null
- || (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP)) != null) {
+ || (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP, clientVersion)) != null) {
if (table.getTimeStamp() < clientTimeStamp) {
if (isTableDeleted(table) || tableType != table.getType()) {
return new MetaDataMutationResult(MutationCode.TABLE_NOT_FOUND, EnvironmentEdgeManager.currentTimeMillis(), null);
@@ -1924,7 +1930,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
if (tableType == PTableType.TABLE || tableType == PTableType.SYSTEM) {
// Handle any child views that exist
- TableViewFinder tableViewFinderResult = findChildViews(region, tenantId, table);
+ TableViewFinder tableViewFinderResult = findChildViews(region, tenantId, table, clientVersion);
if (tableViewFinderResult.hasViews()) {
if (isCascade) {
if (tableViewFinderResult.allViewsInMultipleRegions()) {
@@ -1944,7 +1950,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
acquireLock(region, viewKey, locks);
MetaDataMutationResult result = doDropTable(viewKey, viewTenantId, viewSchemaName,
viewName, null, PTableType.VIEW, rowsToDelete, invalidateList, locks,
- tableNamesToDelete, sharedTablesToDelete, false);
+ tableNamesToDelete, sharedTablesToDelete, false, clientVersion);
if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) { return result; }
}
}
@@ -2007,7 +2013,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
acquireLock(region, indexKey, locks);
MetaDataMutationResult result =
doDropTable(indexKey, tenantId, schemaName, indexName, tableName, PTableType.INDEX,
- rowsToDelete, invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, false);
+ rowsToDelete, invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, false, clientVersion);
if (result.getMutationCode() != MutationCode.TABLE_ALREADY_EXISTS) {
return result;
}
@@ -2025,7 +2031,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
private MetaDataMutationResult
- mutateColumn(List<Mutation> tableMetadata, ColumnMutator mutator) throws IOException {
+ mutateColumn(List<Mutation> tableMetadata, ColumnMutator mutator, int clientVersion) throws IOException {
byte[][] rowKeyMetaData = new byte[5][];
MetaDataUtil.getTenantIdAndSchemaAndTableName(tableMetadata, rowKeyMetaData);
byte[] tenantId = rowKeyMetaData[PhoenixDatabaseMetaData.TENANT_ID_INDEX];
@@ -2059,7 +2065,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
// Get client timeStamp from mutations
long clientTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
if (table == null
- && (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP)) == null) {
+ && (table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP, clientVersion)) == null) {
// if not found then call newerTableExists and add delete marker for timestamp
// found
table = buildDeletedTable(key, cacheKey, region, clientTimeStamp);
@@ -2130,7 +2136,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
if (result !=null) {
return result;
} else {
- table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP);
+ table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP, clientVersion);
return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime, table);
}
} finally {
@@ -2289,7 +2295,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
private MetaDataMutationResult addColumnsAndTablePropertiesToChildViews(PTable basePhysicalTable, List<Mutation> tableMetadata, List<Mutation> mutationsForAddingColumnsToViews, byte[] schemaName, byte[] tableName,
List<ImmutableBytesPtr> invalidateList, long clientTimeStamp, TableViewFinder childViewsResult,
- Region region, List<RowLock> locks) throws IOException, SQLException {
+ Region region, List<RowLock> locks, int clientVersion) throws IOException, SQLException {
List<PutWithOrdinalPosition> columnPutsForBaseTable = Lists.newArrayListWithExpectedSize(tableMetadata.size());
Map<TableProperty, Cell> tablePropertyCellMap = Maps.newHashMapWithExpectedSize(tableMetadata.size());
// Isolate the puts relevant to adding columns. Also figure out what kind of columns are being added.
@@ -2336,7 +2342,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
// lock the rows corresponding to views so that no other thread can modify the view meta-data
RowLock viewRowLock = acquireLock(region, viewKey, locks);
- PTable view = doGetTable(viewKey, clientTimeStamp, viewRowLock);
+ PTable view = doGetTable(viewKey, clientTimeStamp, viewRowLock, clientVersion);
ColumnOrdinalPositionUpdateList ordinalPositionList = new ColumnOrdinalPositionUpdateList();
List<PColumn> viewPkCols = new ArrayList<>(view.getPKColumns());
@@ -2635,7 +2641,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
PTable basePhysicalTable, List<RowLock> locks, List<Mutation> tableMetadata,
List<Mutation> mutationsForAddingColumnsToViews, byte[] schemaName, byte[] tableName,
List<ImmutableBytesPtr> invalidateList, long clientTimeStamp,
- TableViewFinder childViewsResult, List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete)
+ TableViewFinder childViewsResult, List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete, int clientVersion)
throws IOException, SQLException {
List<Delete> columnDeletesForBaseTable = new ArrayList<>(tableMetadata.size());
// Isolate the deletes relevant to dropping columns. Also figure out what kind of columns
@@ -2661,7 +2667,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
// lock the rows corresponding to views so that no other thread can modify the view
// meta-data
RowLock viewRowLock = acquireLock(region, viewKey, locks);
- PTable view = doGetTable(viewKey, clientTimeStamp, viewRowLock);
+ PTable view = doGetTable(viewKey, clientTimeStamp, viewRowLock, clientVersion);
ColumnOrdinalPositionUpdateList ordinalPositionList =
new ColumnOrdinalPositionUpdateList();
@@ -2732,7 +2738,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
dropIndexes(view, region, invalidateList, locks, clientTimeStamp,
schemaName, view.getName().getBytes(),
mutationsForAddingColumnsToViews, existingViewColumn,
- tableNamesToDelete, sharedTablesToDelete);
+ tableNamesToDelete, sharedTablesToDelete, clientVersion);
}
}
@@ -2954,7 +2960,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
List<Mutation> mutationsForAddingColumnsToViews = Lists.newArrayListWithExpectedSize(tableMetaData.size() * ( 1 + table.getIndexes().size()));
if (type == PTableType.TABLE || type == PTableType.SYSTEM) {
TableViewFinder childViewsResult = new TableViewFinder();
- findAllChildViews(region, tenantId, table, childViewsResult, clientTimeStamp);
+ findAllChildViews(region, tenantId, table, childViewsResult, clientTimeStamp, request.getClientVersion());
if (childViewsResult.hasViews()) {
/*
* Dis-allow if:
@@ -2979,7 +2985,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
} else {
mutationsForAddingColumnsToViews = new ArrayList<>(childViewsResult.getViewInfoList().size() * tableMetaData.size());
MetaDataMutationResult mutationResult = addColumnsAndTablePropertiesToChildViews(table, tableMetaData, mutationsForAddingColumnsToViews, schemaName, tableName, invalidateList, clientTimeStamp,
- childViewsResult, region, locks);
+ childViewsResult, region, locks, request.getClientVersion());
// return if we were not able to add the column successfully
if (mutationResult!=null)
return mutationResult;
@@ -3070,7 +3076,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
tableMetaData.addAll(mutationsForAddingColumnsToViews);
return null;
}
- });
+ }, request.getClientVersion());
if (result != null) {
done.run(MetaDataMutationResult.toProto(result));
}
@@ -3081,11 +3087,11 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
}
- private PTable doGetTable(byte[] key, long clientTimeStamp) throws IOException, SQLException {
- return doGetTable(key, clientTimeStamp, null);
+ private PTable doGetTable(byte[] key, long clientTimeStamp, int clientVersion) throws IOException, SQLException {
+ return doGetTable(key, clientTimeStamp, null, clientVersion);
}
- private PTable doGetTable(byte[] key, long clientTimeStamp, RowLock rowLock) throws IOException, SQLException {
+ private PTable doGetTable(byte[] key, long clientTimeStamp, RowLock rowLock, int clientVersion) throws IOException, SQLException {
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
GlobalCache.getInstance(this.env).getMetaDataCache();
@@ -3137,13 +3143,13 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
return table;
}
// Query for the latest table first, since it's not cached
- table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP);
+ table = buildTable(key, cacheKey, region, HConstants.LATEST_TIMESTAMP, clientVersion);
if ((table != null && table.getTimeStamp() < clientTimeStamp) ||
(blockWriteRebuildIndex && table.getIndexDisableTimestamp() > 0)) {
return table;
}
// Otherwise, query for an older version of the table - it won't be cached
- return buildTable(key, cacheKey, region, clientTimeStamp);
+ return buildTable(key, cacheKey, region, clientTimeStamp, clientVersion);
} finally {
if (!wasLocked) rowLock.release();
}
@@ -3210,7 +3216,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
@Override
- public void dropColumn(RpcController controller, DropColumnRequest request,
+ public void dropColumn(RpcController controller, final DropColumnRequest request,
RpcCallback<MetaDataResponse> done) {
List<Mutation> tableMetaData = null;
final List<byte[]> tableNamesToDelete = Lists.newArrayList();
@@ -3232,13 +3238,13 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
PTableType type = table.getType();
if (type == PTableType.TABLE || type == PTableType.SYSTEM) {
TableViewFinder childViewsResult = new TableViewFinder();
- findAllChildViews(region, tenantId, table, childViewsResult, clientTimeStamp);
+ findAllChildViews(region, tenantId, table, childViewsResult, clientTimeStamp, request.getClientVersion());
if (childViewsResult.hasViews()) {
MetaDataMutationResult mutationResult =
dropColumnsFromChildViews(region, table,
locks, tableMetaData, additionalTableMetaData,
schemaName, tableName, invalidateList,
- clientTimeStamp, childViewsResult, tableNamesToDelete, sharedTablesToDelete);
+ clientTimeStamp, childViewsResult, tableNamesToDelete, sharedTablesToDelete, request.getClientVersion());
// return if we were not able to drop the column successfully
if (mutationResult != null) return mutationResult;
}
@@ -3296,7 +3302,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
dropIndexes(table, region, invalidateList, locks,
clientTimeStamp, schemaName, tableName,
additionalTableMetaData, columnToDelete,
- tableNamesToDelete, sharedTablesToDelete);
+ tableNamesToDelete, sharedTablesToDelete, request.getClientVersion());
} catch (ColumnFamilyNotFoundException e) {
return new MetaDataMutationResult(
MutationCode.COLUMN_NOT_FOUND, EnvironmentEdgeManager
@@ -3319,7 +3325,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
long currentTime = MetaDataUtil.getClientTimeStamp(tableMetaData);
return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime, null, tableNamesToDelete, sharedTablesToDelete);
}
- });
+ }, request.getClientVersion());
if (result != null) {
done.run(MetaDataMutationResult.toProto(result));
}
@@ -3333,7 +3339,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
private void dropIndexes(PTable table, Region region, List<ImmutableBytesPtr> invalidateList,
List<RowLock> locks, long clientTimeStamp, byte[] schemaName,
byte[] tableName, List<Mutation> additionalTableMetaData, PColumn columnToDelete,
- List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete)
+ List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete, int clientVersion)
throws IOException, SQLException {
// Look for columnToDelete in any indexes. If found as PK column, get lock and drop the
// index and then invalidate it
@@ -3372,7 +3378,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
additionalTableMetaData.add(new Delete(linkKey, clientTimeStamp));
doDropTable(indexKey, tenantId, index.getSchemaName().getBytes(), index
.getTableName().getBytes(), tableName, index.getType(),
- additionalTableMetaData, invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, false);
+ additionalTableMetaData, invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, false, clientVersion);
invalidateList.add(new ImmutableBytesPtr(indexKey));
}
// If the dropped column is a covered index column, invalidate the index
@@ -3438,7 +3444,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
done.run(MetaDataMutationResult.toProto(result));
return;
}
- long timeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
+ long timeStamp = HConstants.LATEST_TIMESTAMP;
ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key);
List<Cell> newKVs = tableMetadata.get(0).getFamilyCellMap().get(TABLE_FAMILY_BYTES);
Cell newKV = null;
@@ -3450,6 +3456,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
INDEX_STATE_BYTES, 0, INDEX_STATE_BYTES.length) == 0){
newKV = cell;
indexStateKVIndex = index;
+ timeStamp = cell.getTimestamp();
} else if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
INDEX_DISABLE_TIMESTAMP_BYTES, 0, INDEX_DISABLE_TIMESTAMP_BYTES.length) == 0) {
disableTimeStampKVIndex = index;
@@ -3464,7 +3471,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
try {
Get get = new Get(key);
- get.setTimeRange(PTable.INITIAL_SEQ_NUM, timeStamp);
get.addColumn(TABLE_FAMILY_BYTES, DATA_TABLE_NAME_BYTES);
get.addColumn(TABLE_FAMILY_BYTES, INDEX_STATE_BYTES);
get.addColumn(TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES);
@@ -3484,6 +3490,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
PIndexState currentState =
PIndexState.fromSerializedValue(currentStateKV.getValueArray()[currentStateKV
.getValueOffset()]);
+ // Timestamp of INDEX_STATE gets updated with each call
+ long actualTimestamp = currentStateKV.getTimestamp();
long curTimeStampVal = 0;
if ((currentDisableTimeStamp != null && currentDisableTimeStamp.getValueLength() > 0)) {
curTimeStampVal = (Long) PLong.INSTANCE.toObject(currentDisableTimeStamp.getValueArray(),
@@ -3491,32 +3499,28 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
// new DisableTimeStamp is passed in
if (disableTimeStampKVIndex >= 0) {
Cell newDisableTimeStampCell = newKVs.get(disableTimeStampKVIndex);
+ long expectedTimestamp = newDisableTimeStampCell.getTimestamp();
+ // If the index status has been updated after the upper bound of the scan we use
+ // to partially rebuild the index, then we need to fail the rebuild because an
+ // index write failed before the rebuild was complete.
+ if (actualTimestamp > expectedTimestamp) {
+ builder.setReturnCode(MetaDataProtos.MutationCode.UNALLOWED_TABLE_MUTATION);
+ builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
+ done.run(builder.build());
+ return;
+ }
long newDisableTimeStamp = (Long) PLong.INSTANCE.toObject(newDisableTimeStampCell.getValueArray(),
newDisableTimeStampCell.getValueOffset(), newDisableTimeStampCell.getValueLength());
- // We never set the INDEX_DISABLE_TIMESTAMP to a positive value when we're setting the state to ACTIVE.
- // Instead, we're passing in what we expect the INDEX_DISABLE_TIMESTAMP to be currently. If it's
- // changed, it means that a data table row failed to write while we were partially rebuilding it
- // and we must rerun it.
- if (newState == PIndexState.ACTIVE && newDisableTimeStamp > 0) {
- // Don't allow setting to ACTIVE if the INDEX_DISABLE_TIMESTAMP doesn't match
- // what we expect.
- if (newDisableTimeStamp != Math.abs(curTimeStampVal)) {
- builder.setReturnCode(MetaDataProtos.MutationCode.UNALLOWED_TABLE_MUTATION);
- builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
- done.run(builder.build());
- return;
- }
- // Reset INDEX_DISABLE_TIMESTAMP_BYTES to zero as we're good to go.
- newKVs.set(disableTimeStampKVIndex,
- CellUtil.createCell(key, TABLE_FAMILY_BYTES, INDEX_DISABLE_TIMESTAMP_BYTES,
- timeStamp, KeyValue.Type.Put.getCode(), PLong.INSTANCE.toBytes(0L)));
- }
// We use the sign of the INDEX_DISABLE_TIMESTAMP to differentiate the keep-index-active (negative)
// from block-writes-to-data-table case. In either case, we want to keep the oldest timestamp to
// drive the partial index rebuild rather than update it with each attempt to update the index
// when a new data table write occurs.
- if (curTimeStampVal != 0 && Math.abs(curTimeStampVal) < Math.abs(newDisableTimeStamp)) {
- // not reset disable timestamp
+ // We do legitimately move the INDEX_DISABLE_TIMESTAMP to be newer when we're rebuilding the
+ // index in which case the state will be INACTIVE or PENDING_ACTIVE.
+ if (curTimeStampVal != 0
+ && (newState == PIndexState.DISABLE || newState == PIndexState.PENDING_ACTIVE)
+ && Math.abs(curTimeStampVal) < Math.abs(newDisableTimeStamp)) {
+ // do not reset disable timestamp as we want to keep the min
newKVs.remove(disableTimeStampKVIndex);
disableTimeStampKVIndex = -1;
}
@@ -3548,29 +3552,42 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
if (currentState == PIndexState.BUILDING && newState != PIndexState.ACTIVE) {
timeStamp = currentStateKV.getTimestamp();
}
- if ((currentState == PIndexState.UNUSABLE && newState == PIndexState.ACTIVE)
- || (currentState == PIndexState.ACTIVE && newState == PIndexState.UNUSABLE)) {
+ if ((currentState == PIndexState.ACTIVE || currentState == PIndexState.PENDING_ACTIVE) && newState == PIndexState.UNUSABLE) {
newState = PIndexState.INACTIVE;
newKVs.set(indexStateKVIndex, KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
INDEX_STATE_BYTES, timeStamp, Bytes.toBytes(newState.getSerializedValue())));
- } else if (currentState == PIndexState.INACTIVE && newState == PIndexState.USABLE) {
- newState = PIndexState.ACTIVE;
+ } else if ((currentState == PIndexState.INACTIVE || currentState == PIndexState.PENDING_ACTIVE) && newState == PIndexState.USABLE) {
+ // Don't allow manual state change to USABLE (i.e. ACTIVE) if non zero INDEX_DISABLE_TIMESTAMP
+ if (curTimeStampVal != 0) {
+ newState = currentState;
+ } else {
+ newState = PIndexState.ACTIVE;
+ }
newKVs.set(indexStateKVIndex, KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
INDEX_STATE_BYTES, timeStamp, Bytes.toBytes(newState.getSerializedValue())));
}
PTable returnTable = null;
if (currentState != newState || disableTimeStampKVIndex != -1) {
+ // make a copy of tableMetadata so we can add to it
+ tableMetadata = new ArrayList<Mutation>(tableMetadata);
+ // Always include the empty column value at latest timestamp so
+ // that clients pull over update.
+ Put emptyValue = new Put(key);
+ emptyValue.addColumn(TABLE_FAMILY_BYTES,
+ QueryConstants.EMPTY_COLUMN_BYTES,
+ HConstants.LATEST_TIMESTAMP,
+ QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+ tableMetadata.add(emptyValue);
byte[] dataTableKey = null;
- if(dataTableKV != null) {
- dataTableKey = SchemaUtil.getTableKey(tenantId, schemaName, dataTableKV.getValue());
- }
- if(dataTableKey != null) {
- // make a copy of tableMetadata
- tableMetadata = new ArrayList<Mutation>(tableMetadata);
+ if (dataTableKV != null) {
+ dataTableKey = SchemaUtil.getTableKey(tenantId, schemaName, CellUtil.cloneValue(dataTableKV));
// insert an empty KV to trigger time stamp update on data table row
Put p = new Put(dataTableKey);
- p.add(TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, timeStamp, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+ p.addColumn(TABLE_FAMILY_BYTES,
+ QueryConstants.EMPTY_COLUMN_BYTES,
+ HConstants.LATEST_TIMESTAMP,
+ QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
tableMetadata.add(p);
}
boolean setRowKeyOrderOptimizableCell = newState == PIndexState.BUILDING && !rowKeyOrderOptimizable;
@@ -3589,7 +3606,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
}
if (setRowKeyOrderOptimizableCell || disableTimeStampKVIndex != -1
|| currentState == PIndexState.DISABLE || newState == PIndexState.BUILDING) {
- returnTable = doGetTable(key, HConstants.LATEST_TIMESTAMP, rowLock);
+ returnTable = doGetTable(key, HConstants.LATEST_TIMESTAMP, rowLock, request.getClientVersion());
}
}
// Get client timeStamp from mutations, since it may get updated by the