You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/02/24 22:13:44 UTC
[26/50] [abbrv] phoenix git commit: PHOENIX-2635 Partial index
rebuild doesn't work for mutable data
PHOENIX-2635 Partial index rebuild doesn't work for mutable data
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/046bda34
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/046bda34
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/046bda34
Branch: refs/heads/calcite
Commit: 046bda34771aaec3befd4ad17024afc5af9b83ed
Parents: e2a6386
Author: James Taylor <jt...@salesforce.com>
Authored: Mon Feb 15 00:33:05 2016 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Mon Feb 15 10:14:54 2016 -0800
----------------------------------------------------------------------
.../end2end/index/MutableIndexFailureIT.java | 379 +++++++------------
.../end2end/index/ReadOnlyIndexFailureIT.java | 75 ++--
.../EndToEndCoveredColumnsIndexBuilderIT.java | 2 +-
.../coprocessor/BaseScannerRegionObserver.java | 5 +-
.../coprocessor/MetaDataRegionObserver.java | 120 +++++-
.../hbase/index/covered/LocalTableState.java | 19 +-
.../phoenix/hbase/index/covered/TableState.java | 7 +-
.../index/covered/data/LocalHBaseState.java | 6 +-
.../hbase/index/covered/data/LocalTable.java | 9 +-
.../example/CoveredColumnIndexCodec.java | 4 +-
.../hbase/index/scanner/ScannerBuilder.java | 1 -
.../apache/phoenix/index/IndexMaintainer.java | 4 +-
.../apache/phoenix/index/PhoenixIndexCodec.java | 12 +-
.../phoenix/index/PhoenixIndexMetaData.java | 10 +-
.../index/PhoenixTransactionalIndexer.java | 2 +-
.../org/apache/phoenix/jdbc/PhoenixDriver.java | 32 +-
.../apache/phoenix/parse/NamedTableNode.java | 8 +
.../phoenix/query/QueryServicesOptions.java | 2 +-
.../apache/phoenix/schema/MetaDataClient.java | 34 +-
.../org/apache/phoenix/util/PhoenixRuntime.java | 8 +-
.../index/covered/TestLocalTableState.java | 10 +-
.../example/TestCoveredColumnIndexCodec.java | 4 +-
22 files changed, 368 insertions(+), 385 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 176c5a0..ebc6988 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -30,24 +30,17 @@ import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HBaseCluster;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.query.QueryServices;
@@ -61,7 +54,6 @@ import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.StringUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -75,28 +67,29 @@ import com.google.common.collect.Maps;
* For some reason dropping tables after running this test
* fails unless it runs its own mini cluster.
*
- *
- * @since 2.1
*/
@Category(NeedsOwnMiniClusterTest.class)
@RunWith(Parameterized.class)
public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
- private Timer scheduleTimer;
-
+ public static volatile boolean FAIL_WRITE = false;
+ public static final String INDEX_NAME = "IDX";
+
private String tableName;
private String indexName;
private String fullTableName;
private String fullIndexName;
- private boolean transactional;
+ private final boolean transactional;
+ private final boolean localIndex;
private final String tableDDLOptions;
- public MutableIndexFailureIT(boolean transactional) {
+ public MutableIndexFailureIT(boolean transactional, boolean localIndex) {
this.transactional = transactional;
+ this.localIndex = localIndex;
this.tableDDLOptions = transactional ? " TRANSACTIONAL=true " : "";
- this.tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + (transactional ? "_TXN" : "");
- this.indexName = "IDX";
+ this.tableName = (localIndex ? "L_" : "") + TestUtil.DEFAULT_DATA_TABLE_NAME + (transactional ? "_TXN" : "");
+ this.indexName = INDEX_NAME;
this.fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
this.fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
}
@@ -104,31 +97,28 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
@BeforeClass
public static void doSetup() throws Exception {
Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
- serverProps.put("hbase.client.retries.number", "2");
+ serverProps.put("hbase.coprocessor.region.classes", FailingRegionObserver.class.getName());
+ serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
+ serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
serverProps.put("hbase.client.pause", "5000");
+ serverProps.put("data.tx.snapshot.dir", "/tmp");
serverProps.put("hbase.balancer.period", String.valueOf(Integer.MAX_VALUE));
- serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, "0");
Map<String, String> clientProps = Collections.singletonMap(QueryServices.TRANSACTIONS_ENABLED, "true");
NUM_SLAVES_BASE = 4;
setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
}
- @Parameters(name = "transactional = {0}")
+ @Parameters(name = "transactional = {0}, localIndex = {1}")
public static Collection<Boolean[]> data() {
- return Arrays.asList(new Boolean[][] { { false }, { true } });
- }
-
- @Test
- public void testWriteFailureDisablesLocalIndex() throws Exception {
- helpTestWriteFailureDisablesIndex(true);
+ return Arrays.asList(new Boolean[][] { { false, false }, { false, true }, { true, false }, { true, true } });
}
@Test
public void testWriteFailureDisablesIndex() throws Exception {
- helpTestWriteFailureDisablesIndex(false);
+ helpTestWriteFailureDisablesIndex();
}
- public void helpTestWriteFailureDisablesIndex(boolean localIndex) throws Exception {
+ public void helpTestWriteFailureDisablesIndex() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = driver.connect(url, props)) {
String query;
@@ -140,15 +130,9 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
rs = conn.createStatement().executeQuery(query);
assertFalse(rs.next());
- if(localIndex) {
- conn.createStatement().execute(
- "CREATE LOCAL INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
- conn.createStatement().execute(
- "CREATE LOCAL INDEX " + indexName+ "_2" + " ON " + fullTableName + " (v2) INCLUDE (v1)");
- } else {
- conn.createStatement().execute(
- "CREATE INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
- }
+ FAIL_WRITE = false;
+ conn.createStatement().execute(
+ "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
query = "SELECT * FROM " + fullIndexName;
rs = conn.createStatement().executeQuery(query);
@@ -167,23 +151,50 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
stmt.setString(2, "x");
stmt.setString(3, "1");
stmt.execute();
+ stmt.setString(1, "b");
+ stmt.setString(2, "y");
+ stmt.setString(3, "2");
+ stmt.execute();
+ stmt.setString(1, "c");
+ stmt.setString(2, "z");
+ stmt.setString(3, "3");
+ stmt.execute();
conn.commit();
- TableName indexTable =
- TableName.valueOf(localIndex ? MetaDataUtil
- .getLocalIndexTableName(fullTableName) : fullIndexName);
- HBaseAdmin admin = getUtility().getHBaseAdmin();
- HTableDescriptor indexTableDesc = admin.getTableDescriptor(indexTable);
- try{
- admin.disableTable(indexTable);
- admin.deleteTable(indexTable);
- } catch (TableNotFoundException ignore) {}
+ query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
+ rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+ String expectedPlan =
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullTableName;
+ assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs));
+ rs = conn.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals("a", rs.getString(1));
+ assertEquals("x", rs.getString(2));
+ assertTrue(rs.next());
+ assertEquals("b", rs.getString(1));
+ assertEquals("y", rs.getString(2));
+ assertTrue(rs.next());
+ assertEquals("c", rs.getString(1));
+ assertEquals("z", rs.getString(2));
+ assertFalse(rs.next());
+
+ FAIL_WRITE = true;
stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
- stmt.setString(1, "a2");
+ // Insert new row
+ stmt.setString(1, "d");
+ stmt.setString(2, "d");
+ stmt.setString(3, "4");
+ stmt.execute();
+ // Update existing row
+ stmt.setString(1, "a");
stmt.setString(2, "x2");
stmt.setString(3, "2");
stmt.execute();
+ // Delete existing row
+ stmt = conn.prepareStatement("DELETE FROM " + fullTableName + " WHERE k=?");
+ stmt.setString(1, "b");
+ stmt.execute();
try {
conn.commit();
fail();
@@ -196,20 +207,17 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
assertTrue(rs.next());
assertEquals(indexName, rs.getString(3));
// the index is only disabled for non-txn tables upon index table write failure
- PIndexState indexState = transactional ? PIndexState.ACTIVE : PIndexState.DISABLE;
- assertEquals(indexState.toString(), rs.getString("INDEX_STATE"));
- assertFalse(rs.next());
- if(localIndex) {
- rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName + "_2",
- new String[] { PTableType.INDEX.toString() });
- assertTrue(rs.next());
- assertEquals(indexName + "_2", rs.getString(3));
- assertEquals(indexState.toString(), rs.getString("INDEX_STATE"));
- assertFalse(rs.next());
+ if (transactional) {
+ assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
+ } else {
+ String indexState = rs.getString("INDEX_STATE");
+ assertTrue(PIndexState.DISABLE.toString().equals(indexState) || PIndexState.INACTIVE.toString().equals(indexState));
}
+ assertFalse(rs.next());
- // if the table is transactional the write to the index table will fail because the
- // index has not been disabled
+ // If the table is transactional the write to both the data and index table will fail
+ // in an all or none manner. If the table is not transactional, then the data writes
+ // would have succeeded while the index writes would have failed.
if (!transactional) {
// Verify UPSERT on data table still work after index is disabled
stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
@@ -218,210 +226,101 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
stmt.setString(3, "3");
stmt.execute();
conn.commit();
- }
- if (transactional) {
- // if the table was transactional there should be 1 row (written before the index
- // was disabled)
- query = "SELECT /*+ NO_INDEX */ v2 FROM " + fullTableName;
+ // Verify previous writes succeeded to data table
+ query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
rs = conn.createStatement().executeQuery("EXPLAIN " + query);
- String expectedPlan =
+ expectedPlan =
"CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullTableName;
assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs));
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
- assertEquals("1", rs.getString(1));
- assertFalse(rs.next());
- } else {
- // if the table was not transactional there should be three rows (all writes to data
- // table should succeed)
- query = "SELECT v2 FROM " + fullTableName;
- rs = conn.createStatement().executeQuery("EXPLAIN " + query);
- String expectedPlan =
- "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullTableName;
- assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs));
- rs = conn.createStatement().executeQuery(query);
+ assertEquals("a", rs.getString(1));
+ assertEquals("x2", rs.getString(2));
assertTrue(rs.next());
- assertEquals("1", rs.getString(1));
+ assertEquals("a3", rs.getString(1));
+ assertEquals("x3", rs.getString(2));
assertTrue(rs.next());
- assertEquals("2", rs.getString(1));
+ assertEquals("c", rs.getString(1));
+ assertEquals("z", rs.getString(2));
assertTrue(rs.next());
- assertEquals("3", rs.getString(1));
+ assertEquals("d", rs.getString(1));
+ assertEquals("d", rs.getString(2));
assertFalse(rs.next());
}
- // recreate index table
- admin.createTable(indexTableDesc);
- do {
- Thread.sleep(15 * 1000); // sleep 15 secs
- rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName,
- new String[] { PTableType.INDEX.toString() });
- assertTrue(rs.next());
- if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
- break;
- }
- if(localIndex) {
- rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName + "_2",
+ // re-enable index table
+ FAIL_WRITE = false;
+
+ boolean isActive = false;
+ if (!transactional) {
+ int maxTries = 3, nTries = 0;
+ do {
+ Thread.sleep(15 * 1000); // sleep 15 secs
+ rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName,
new String[] { PTableType.INDEX.toString() });
assertTrue(rs.next());
if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
+ isActive = true;
break;
}
- }
- } while(true);
+ } while(++nTries < maxTries);
+ assertTrue(isActive);
+ }
// Verify UPSERT on data table still work after index table is recreated
stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
- stmt.setString(1, "a4");
+ stmt.setString(1, "a3");
stmt.setString(2, "x4");
stmt.setString(3, "4");
stmt.execute();
conn.commit();
- // verify index table has data
- query = "SELECT count(1) FROM " + fullIndexName;
+ // verify index table has correct data
+ query = "SELECT /*+ INDEX(" + indexName + ") */ k,v1 FROM " + fullTableName;
+ rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+ expectedPlan =
+ " OVER " + (localIndex ? MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX + tableName : fullIndexName);
+ String explainPlan = QueryUtil.getExplainPlan(rs);
+ assertTrue(explainPlan.contains(expectedPlan));
rs = conn.createStatement().executeQuery(query);
- assertTrue(rs.next());
-
- // for txn tables there will be only one row in the index (a4)
- // for non txn tables there will be three rows because we only partially build index
- // from where we failed and the oldest
- // index row has been deleted when we dropped the index table during test
- assertEquals(transactional ? 1 : 3, rs.getInt(1));
- }
- }
-
-
- @Ignore("See PHOENIX-2332")
- @Test
- public void testWriteFailureWithRegionServerDown() throws Exception {
- String query;
- ResultSet rs;
-
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- try (Connection conn = driver.connect(url, props);) {
- conn.setAutoCommit(false);
- conn.createStatement().execute(
- "CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) "+tableDDLOptions);
- query = "SELECT * FROM " + fullTableName;
- rs = conn.createStatement().executeQuery(query);
- assertFalse(rs.next());
-
- conn.createStatement().execute(
- "CREATE INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
- query = "SELECT * FROM " + fullIndexName;
- rs = conn.createStatement().executeQuery(query);
- assertFalse(rs.next());
-
- // Verify the metadata for index is correct.
- rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName,
- new String[] { PTableType.INDEX.toString() });
- assertTrue(rs.next());
- assertEquals(indexName, rs.getString(3));
- assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
- assertFalse(rs.next());
-
- PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
- stmt.setString(1, "a");
- stmt.setString(2, "x");
- stmt.setString(3, "1");
- stmt.execute();
- conn.commit();
-
- // find a RS which doesn't has CATALOG table
- TableName catalogTable = TableName.valueOf("SYSTEM.CATALOG");
- TableName indexTable = TableName.valueOf(fullIndexName);
- final HBaseCluster cluster = getUtility().getHBaseCluster();
- Collection<ServerName> rss = cluster.getClusterStatus().getServers();
- HBaseAdmin admin = getUtility().getHBaseAdmin();
- List<HRegionInfo> regions = admin.getTableRegions(catalogTable);
- ServerName catalogRS = cluster.getServerHoldingRegion(regions.get(0).getTable(),
- regions.get(0).getRegionName());
- ServerName metaRS = cluster.getServerHoldingMeta();
- ServerName rsToBeKilled = null;
-
- // find first RS isn't holding META or CATALOG table
- for(ServerName curRS : rss) {
- if(!curRS.equals(catalogRS) && !metaRS.equals(curRS)) {
- rsToBeKilled = curRS;
- break;
- }
- }
- assertTrue(rsToBeKilled != null);
-
- regions = admin.getTableRegions(indexTable);
- final HRegionInfo indexRegion = regions.get(0);
- final ServerName dstRS = rsToBeKilled;
- admin.move(indexRegion.getEncodedNameAsBytes(), Bytes.toBytes(rsToBeKilled.getServerName()));
- getUtility().waitFor(30000, 200, new Waiter.Predicate<Exception>() {
- @Override
- public boolean evaluate() throws Exception {
- ServerName sn = cluster.getServerHoldingRegion(indexRegion.getTable(),
- indexRegion.getRegionName());
- return (sn != null && sn.equals(dstRS));
- }
- });
-
- // use timer sending updates in every 10ms
- this.scheduleTimer = new Timer(true);
- this.scheduleTimer.schedule(new SendingUpdatesScheduleTask(conn, fullTableName), 0, 10);
- // let timer sending some updates
- Thread.sleep(100);
-
- // kill RS hosting index table
- getUtility().getHBaseCluster().killRegionServer(rsToBeKilled);
-
- // wait for index table completes recovery
- getUtility().waitUntilAllRegionsAssigned(indexTable);
-
- // Verify the metadata for index is correct.
- do {
- Thread.sleep(15 * 1000); // sleep 15 secs
- rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName,
- new String[] { PTableType.INDEX.toString() });
+ if (transactional) { // failed commit does not get retried
+ assertTrue(rs.next());
+ assertEquals("a", rs.getString(1));
+ assertEquals("x", rs.getString(2));
+ assertTrue(rs.next());
+ assertEquals("a3", rs.getString(1));
+ assertEquals("x4", rs.getString(2));
+ assertTrue(rs.next());
+ assertEquals("b", rs.getString(1));
+ assertEquals("y", rs.getString(2));
+ assertTrue(rs.next());
+ assertEquals("c", rs.getString(1));
+ assertEquals("z", rs.getString(2));
+ assertFalse(rs.next());
+ } else { // failed commit eventually succeeds
+ assertTrue(rs.next());
+ assertEquals("d", rs.getString(1));
+ assertEquals("d", rs.getString(2));
+ assertTrue(rs.next());
+ assertEquals("a", rs.getString(1));
+ assertEquals("x2", rs.getString(2));
+ assertTrue(rs.next());
+ assertEquals("a3", rs.getString(1));
+ assertEquals("x4", rs.getString(2));
assertTrue(rs.next());
- if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
- break;
- }
- } while(true);
- this.scheduleTimer.cancel();
+ assertEquals("c", rs.getString(1));
+ assertEquals("z", rs.getString(2));
+ assertFalse(rs.next());
+ }
}
}
-
- static class SendingUpdatesScheduleTask extends TimerTask {
- private static final Log LOG = LogFactory.getLog(SendingUpdatesScheduleTask.class);
-
- // inProgress is to prevent timer from invoking a new task while previous one is still
- // running
- private final static AtomicInteger inProgress = new AtomicInteger(0);
- private final Connection conn;
- private final String fullTableName;
- private int inserts = 0;
-
- public SendingUpdatesScheduleTask(Connection conn, String fullTableName) {
- this.conn = conn;
- this.fullTableName = fullTableName;
- }
-
+
+ public static class FailingRegionObserver extends SimpleRegionObserver {
@Override
- public void run() {
- if(inProgress.get() > 0){
- return;
- }
-
- try {
- inProgress.incrementAndGet();
- inserts++;
- PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
- stmt.setString(1, "a" + inserts);
- stmt.setString(2, "x" + inserts);
- stmt.setString(3, String.valueOf(inserts));
- stmt.execute();
- conn.commit();
- } catch (Throwable t) {
- LOG.warn("ScheduledBuildIndexTask failed!", t);
- } finally {
- inProgress.decrementAndGet();
+ public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
+ if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) {
+ throw new DoNotRetryIOException();
}
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
index 8df82ce..931fcae 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ReadOnlyIndexFailureIT.java
@@ -27,19 +27,20 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.phoenix.end2end.BaseOwnClusterHBaseManagedTimeIT;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.exception.SQLExceptionCode;
@@ -56,6 +57,9 @@ import org.apache.phoenix.util.TestUtil;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import com.google.common.collect.Maps;
/**
@@ -69,28 +73,37 @@ import com.google.common.collect.Maps;
*/
@Category(NeedsOwnMiniClusterTest.class)
+@RunWith(Parameterized.class)
public class ReadOnlyIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
- private static final String FAIL_ON_FIRST_PUT = "bbb";
+ public static volatile boolean FAIL_WRITE = false;
+ public static final String INDEX_NAME = "IDX";
private String tableName;
private String indexName;
private String fullTableName;
private String fullIndexName;
+ private final boolean localIndex;
- public ReadOnlyIndexFailureIT() {
- this.tableName = TestUtil.DEFAULT_DATA_TABLE_NAME;
- this.indexName = "IDX";
+ public ReadOnlyIndexFailureIT(boolean localIndex) {
+ this.localIndex = localIndex;
+ this.tableName = (localIndex ? "L_" : "") + TestUtil.DEFAULT_DATA_TABLE_NAME;
+ this.indexName = INDEX_NAME;
this.fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
this.fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
}
+ @Parameters(name = "localIndex = {0}")
+ public static Collection<Boolean[]> data() {
+ return Arrays.asList(new Boolean[][] { { false }, { true } });
+ }
+
@BeforeClass
public static void doSetup() throws Exception {
Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
- serverProps.put("hbase.client.retries.number", "2");
+ serverProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
+ serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
serverProps.put("hbase.client.pause", "5000");
serverProps.put("hbase.balancer.period", String.valueOf(Integer.MAX_VALUE));
- serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, "0");
serverProps.put(QueryServices.INDEX_FAILURE_BLOCK_WRITE, "true");
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, "true");
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, "1000");
@@ -105,16 +118,11 @@ public class ReadOnlyIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
}
@Test
- public void testWriteFailureReadOnlyLocalIndex() throws Exception {
- helpTestWriteFailureReadOnlyIndex(true);
- }
-
- @Test
public void testWriteFailureReadOnlyIndex() throws Exception {
- helpTestWriteFailureReadOnlyIndex(false);
+ helpTestWriteFailureReadOnlyIndex();
}
- public void helpTestWriteFailureReadOnlyIndex(boolean localIndex) throws Exception {
+ public void helpTestWriteFailureReadOnlyIndex() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = driver.connect(url, props)) {
String query;
@@ -126,6 +134,7 @@ public class ReadOnlyIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
rs = conn.createStatement().executeQuery(query);
assertFalse(rs.next());
+ FAIL_WRITE = false;
if(localIndex) {
conn.createStatement().execute(
"CREATE LOCAL INDEX " + indexName + " ON " + fullTableName
@@ -157,9 +166,10 @@ public class ReadOnlyIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
stmt.execute();
conn.commit();
+ FAIL_WRITE = true;
stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
stmt.setString(1, "2");
- stmt.setString(2, FAIL_ON_FIRST_PUT);
+ stmt.setString(2, "bbb");
stmt.setString(3, "b2");
stmt.execute();
try {
@@ -201,6 +211,7 @@ public class ReadOnlyIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
assertEquals(SQLExceptionCode.INDEX_FAILURE_BLOCK_WRITE.getErrorCode(), e.getErrorCode());
}
+ FAIL_WRITE = false;
// Second attempt at writing will succeed
int retries = 0;
do {
@@ -222,12 +233,12 @@ public class ReadOnlyIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
conn.commit();
// verify index table has data
- query = "SELECT count(1) FROM " + indexName;
+ query = "SELECT count(1) FROM " + fullIndexName;
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals(3, rs.getInt(1));
- query = "SELECT v1 FROM " + fullTableName;
+ query = "SELECT /*+ INDEX(" + indexName + ") */ v1 FROM " + fullTableName;
rs = conn.createStatement().executeQuery(query);
assertTrue(rs.next());
assertEquals("aaa", rs.getString(1));
@@ -261,29 +272,13 @@ public class ReadOnlyIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
return (!rs.wasNull() && ts > 0);
}
+
public static class FailingRegionObserver extends SimpleRegionObserver {
- private Integer failCount = new Integer(0);
-
@Override
- public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
- final Durability durability) throws HBaseIOException {
- if (shouldFailUpsert(c, put)) {
- synchronized (failCount) {
- failCount++;
- if (failCount.intValue() == 1) {
- // throwing anything other than instances of IOException result
- // in this coprocessor being unloaded
- // DoNotRetryIOException tells HBase not to retry this mutation
- // multiple times
- throw new DoNotRetryIOException();
- }
- }
+ public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
+ if (c.getEnvironment().getRegionInfo().getTable().getNameAsString().contains(INDEX_NAME) && FAIL_WRITE) {
+ throw new DoNotRetryIOException();
}
}
-
- private boolean shouldFailUpsert(ObserverContext<RegionCoprocessorEnvironment> c, Put put) {
- return Bytes.contains(put.getRow(), Bytes.toBytes(FAIL_ON_FIRST_PUT));
- }
-
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
index 2396719..fe2f1b4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
@@ -148,7 +148,7 @@ public class EndToEndCoveredColumnsIndexBuilderIT {
public void verify(TableState state) {
try {
Scanner kvs =
- ((LocalTableState) state).getIndexedColumnsTableState(Arrays.asList(columns)).getFirst();
+ ((LocalTableState) state).getIndexedColumnsTableState(Arrays.asList(columns), false).getFirst();
int count = 0;
Cell kv;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 9487b36..a5533af 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -58,10 +58,10 @@ import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ServerUtil;
-import co.cask.tephra.Transaction;
-
import com.google.common.collect.ImmutableList;
+import co.cask.tephra.Transaction;
+
abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
@@ -98,6 +98,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
public static final String SKIP_REGION_BOUNDARY_CHECK = "_SKIP_REGION_BOUNDARY_CHECK";
public static final String TX_SCN = "_TxScn";
public static final String SCAN_ACTUAL_START_ROW = "_ScanActualStartRow";
+ public static final String IGNORE_NEWER_MUTATIONS = "_IGNORE_NEWER_MUTATIONS";
/**
* Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index 4e019cd..0cce4d7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.TimerTask;
@@ -31,37 +32,53 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.ServerCacheClient;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.parse.AlterIndexStatement;
+import org.apache.phoenix.parse.NamedTableNode;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.UpgradeUtil;
+import com.google.common.collect.Lists;
+
/**
* Coprocessor for metadata related operations. This coprocessor would only be registered
@@ -190,8 +207,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
// separately, all updating the same data.
RegionScanner scanner = null;
PhoenixConnection conn = null;
- boolean blockWriteRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_BLOCK_WRITE,
- QueryServicesOptions.DEFAULT_INDEX_FAILURE_BLOCK_WRITE);
if (inProgress.get() > 0) {
LOG.debug("New ScheduledBuildIndexTask skipped as there is already one running");
return;
@@ -213,9 +228,13 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
scan.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
+ PTable dataPTable = null;
+ MetaDataClient client = null;
boolean hasMore = false;
List<Cell> results = new ArrayList<Cell>();
+ List<PTable> indexesToPartiallyRebuild = Collections.emptyList();
scanner = this.env.getRegion().getScanner(scan);
+ long earliestDisableTimestamp = Long.MAX_VALUE;
do {
results.clear();
@@ -226,16 +245,18 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
byte[] disabledTimeStamp = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES);
- Long disabledTimeStampVal = 0L;
if (disabledTimeStamp == null || disabledTimeStamp.length == 0) {
continue;
}
// disableTimeStamp has to be a positive value
- disabledTimeStampVal = (Long) PLong.INSTANCE.toObject(disabledTimeStamp);
+ long disabledTimeStampVal = PLong.INSTANCE.getCodec().decodeLong(disabledTimeStamp, 0, SortOrder.getDefault());
if (disabledTimeStampVal <= 0) {
continue;
}
+ if (disabledTimeStampVal < earliestDisableTimestamp) {
+ earliestDisableTimestamp = disabledTimeStampVal;
+ }
byte[] dataTable = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES);
@@ -247,12 +268,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
continue;
}
- if (!blockWriteRebuildIndex && ((Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexStat) != 0)
- && (Bytes.compareTo(PIndexState.INACTIVE.getSerializedBytes(), indexStat) != 0))) {
- // index has to be either in disable or inactive state
- continue;
- }
-
byte[][] rowKeyMetaData = new byte[3][];
SchemaUtil.getVarChars(r.getRow(), 3, rowKeyMetaData);
byte[] schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX];
@@ -266,34 +281,101 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
if (conn == null) {
final Properties props = new Properties();
+ props.setProperty(PhoenixRuntime.NO_UPGRADE_ATTRIB, Boolean.TRUE.toString());
// Set SCN so that we don't ping server and have the upper bound set back to
// the timestamp when the failure occurred.
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(Long.MAX_VALUE));
// don't run a second index populations upsert select
props.setProperty(QueryServices.INDEX_POPULATION_SLEEP_TIME, "0");
conn = DriverManager.getConnection(getJdbcUrl(env), props).unwrap(PhoenixConnection.class);
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTable);
+ dataPTable = PhoenixRuntime.getTable(conn, dataTableFullName);
+ indexesToPartiallyRebuild = Lists.newArrayListWithExpectedSize(dataPTable.getIndexes().size());
+ client = new MetaDataClient(conn);
}
- String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTable);
String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTable);
- PTable dataPTable = PhoenixRuntime.getTable(conn, dataTableFullName);
PTable indexPTable = PhoenixRuntime.getTable(conn, indexTableFullName);
if (!MetaDataUtil.tableRegionsOnline(this.env.getConfiguration(), indexPTable)) {
LOG.debug("Index rebuild has been skipped because not all regions of index table="
+ indexPTable.getName() + " are online.");
continue;
}
+ // Allow index to begin incremental maintenance as index is back online and we
+ // cannot transition directly from DISABLED -> ACTIVE
+ if (Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexStat) == 0) {
+ AlterIndexStatement statement = new AlterIndexStatement(
+ NamedTableNode.create(indexPTable.getSchemaName().getString(), indexPTable.getTableName().getString()),
+ dataPTable.getTableName().getString(),
+ false, PIndexState.INACTIVE);
+ client.alterIndex(statement);
+ }
+ indexesToPartiallyRebuild.add(indexPTable);
+ } while (hasMore);
- MetaDataClient client = new MetaDataClient(conn);
+ if (!indexesToPartiallyRebuild.isEmpty()) {
long overlapTime = env.getConfiguration().getLong(
QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB,
QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME);
- long timeStamp = Math.max(0, disabledTimeStampVal - overlapTime);
+ long timeStamp = Math.max(0, earliestDisableTimestamp - overlapTime);
- LOG.info("Starting to build index=" + indexPTable.getName() + " from timestamp=" + timeStamp);
- client.buildPartialIndexFromTimeStamp(indexPTable, new TableRef(dataPTable, Long.MAX_VALUE, timeStamp), blockWriteRebuildIndex);
-
- } while (hasMore);
+ LOG.info("Starting to build indexes=" + indexesToPartiallyRebuild + " from timestamp=" + timeStamp);
+ Scan dataTableScan = new Scan();
+ dataTableScan.setRaw(true);
+ dataTableScan.setTimeRange(timeStamp, HConstants.LATEST_TIMESTAMP);
+ byte[] physicalTableName = dataPTable.getPhysicalName().getBytes();
+ try (HTableInterface dataHTable = conn.getQueryServices().getTable(physicalTableName)) {
+ Result result;
+ try (ResultScanner dataTableScanner = dataHTable.getScanner(dataTableScan)) {
+ int batchSize = conn.getMutateBatchSize();
+ List<Mutation> mutations = Lists.newArrayListWithExpectedSize(batchSize);
+ ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
+ IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr, indexesToPartiallyRebuild, conn);
+ byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
+ byte[] uuidValue = ServerCacheClient.generateId();
+
+ while ((result = dataTableScanner.next()) != null && !result.isEmpty()) {
+ Put put = null;
+ Delete del = null;
+ for (Cell cell : result.rawCells()) {
+ if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+ if (put == null) {
+ put = new Put(CellUtil.cloneRow(cell));
+ put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+ put.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+ put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES);
+ mutations.add(put);
+ }
+ put.add(cell);
+ } else {
+ if (del == null) {
+ del = new Delete(CellUtil.cloneRow(cell));
+ del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+ del.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+ del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES);
+ mutations.add(del);
+ }
+ del.addDeleteMarker(cell);
+ }
+ }
+ if (mutations.size() == batchSize) {
+ dataHTable.batch(mutations);
+ uuidValue = ServerCacheClient.generateId();
+ }
+ }
+ if (!mutations.isEmpty()) {
+ dataHTable.batch(mutations);
+ }
+ }
+ }
+ for (PTable indexPTable : indexesToPartiallyRebuild) {
+ AlterIndexStatement statement = new AlterIndexStatement(
+ NamedTableNode.create(indexPTable.getSchemaName().getString(), indexPTable.getTableName().getString()),
+ dataPTable.getTableName().getString(),
+ false, PIndexState.ACTIVE);
+ client.alterIndex(statement);
+ }
+ }
} catch (Throwable t) {
LOG.warn("ScheduledBuildIndexTask failed!", t);
} finally {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
index c4ed7a0..2739cc2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@ -36,8 +36,6 @@ import org.apache.phoenix.hbase.index.scanner.Scanner;
import org.apache.phoenix.hbase.index.scanner.ScannerBuilder;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
-import com.google.inject.Key;
-
/**
* Manage the state of the HRegion's view of the table, for the single row.
* <p>
@@ -108,7 +106,7 @@ public class LocalTableState implements TableState {
public void setCurrentTimestamp(long timestamp) {
this.ts = timestamp;
}
-
+
public void resetTrackedColumns() {
this.trackedColumns.clear();
}
@@ -139,6 +137,9 @@ public class LocalTableState implements TableState {
* request - you will never see a column with the timestamp we are tracking, but the next oldest
* timestamp for that column.
* @param indexedColumns the columns to that will be indexed
+ * @param ignoreNewerMutations ignore mutations newer than m when determining current state. Useful
+ * when replaying mutation state for partial index rebuild where writes succeeded to the data
+ * table, but not to the index table.
* @return an iterator over the columns and the {@link IndexUpdate} that should be passed back to
* the builder. Even if no update is necessary for the requested columns, you still need
* to return the {@link IndexUpdate}, just don't set the update for the
@@ -146,8 +147,8 @@ public class LocalTableState implements TableState {
* @throws IOException
*/
public Pair<Scanner, IndexUpdate> getIndexedColumnsTableState(
- Collection<? extends ColumnReference> indexedColumns) throws IOException {
- ensureLocalStateInitialized(indexedColumns);
+ Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations) throws IOException {
+ ensureLocalStateInitialized(indexedColumns, ignoreNewerMutations);
// filter out things with a newer timestamp and track the column references to which it applies
ColumnTracker tracker = new ColumnTracker(indexedColumns);
synchronized (this.trackedColumns) {
@@ -167,7 +168,7 @@ public class LocalTableState implements TableState {
* {@link #getNonIndexedColumnsTableState(List)}, which is unlikely to be called concurrently from the outside. Even
* then, there is still fairly low contention as each new Put/Delete will have its own table state.
*/
- private synchronized void ensureLocalStateInitialized(Collection<? extends ColumnReference> columns)
+ private synchronized void ensureLocalStateInitialized(Collection<? extends ColumnReference> columns, boolean ignoreNewerMutations)
throws IOException {
// check to see if we haven't initialized any columns yet
Collection<? extends ColumnReference> toCover = this.columnSet.findNonCoveredColumns(columns);
@@ -175,7 +176,7 @@ public class LocalTableState implements TableState {
if (toCover.isEmpty()) { return; }
// add the current state of the row
- this.addUpdate(this.table.getCurrentRowState(update, toCover).list(), false);
+ this.addUpdate(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations).list(), false);
// add the covered columns to the set
for (ColumnReference ref : toCover) {
@@ -268,9 +269,9 @@ public class LocalTableState implements TableState {
}
@Override
- public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns)
+ public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations)
throws IOException {
- Pair<Scanner, IndexUpdate> pair = getIndexedColumnsTableState(indexedColumns);
+ Pair<Scanner, IndexUpdate> pair = getIndexedColumnsTableState(indexedColumns, ignoreNewerMutations);
ValueGetter valueGetter = IndexManagementUtil.createGetterFromScanner(pair.getFirst(), getCurrentRowKey());
return new Pair<ValueGetter, IndexUpdate>(valueGetter, pair.getSecond());
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
index 0e961db..bd4bdfb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
@@ -23,7 +23,6 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
-import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -58,9 +57,13 @@ public interface TableState {
/**
* Get a getter interface for the state of the index row
+ * @param indexedColumns list of indexed columns.
+ * @param ignoreNewerMutations ignore mutations newer than m when determining current state. Useful
+ * when replaying mutation state for partial index rebuild where writes succeeded to the data
+ * table, but not to the index table.
*/
Pair<ValueGetter, IndexUpdate> getIndexUpdateState(
- Collection<? extends ColumnReference> indexedColumns) throws IOException;
+ Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations) throws IOException;
/**
* @return the row key for the current row for which we are building an index update.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
index 6d20c18..9968627 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
@@ -23,7 +23,6 @@ import java.util.Collection;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
-
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
/**
@@ -35,13 +34,16 @@ public interface LocalHBaseState {
* @param m mutation for which we should get the current table state
* @param toCover all the columns the current row state needs to cover; hint the underlying lookup
* to save getting all the columns for the row
+ * @param ignoreNewerMutations ignore mutations newer than m when determining current state. Useful
+ * when replaying mutation state for partial index rebuild where writes succeeded to the data
+ * table, but not to the index table.
* @return the full state of the given row. Includes all current versions (even if they are not
* usually visible to the client (unless they are also doing a raw scan)). Never returns a
* <tt>null</tt> {@link Result} - instead, when there is not data for the row, returns a
* {@link Result} with no stored {@link KeyValue}s.
* @throws IOException if there is an issue reading the row
*/
- public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover)
+ public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean ignoreNewerMutations)
throws IOException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
index 549fe8c..003df2a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
-
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
@@ -52,13 +51,19 @@ public class LocalTable implements LocalHBaseState {
}
@Override
- public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> columns)
+ public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> columns, boolean ignoreNewerMutations)
throws IOException {
byte[] row = m.getRow();
// need to use a scan here so we can get raw state, which Get doesn't provide.
Scan s = IndexManagementUtil.newLocalStateScan(Collections.singletonList(columns));
s.setStartRow(row);
s.setStopRow(row);
+ if (ignoreNewerMutations) {
+ // Provides a means of client indicating that newer cells should not be considered,
+ // enabling mutations to be replayed to partially rebuild the index when a write fails.
+ long ts = m.getFamilyCellMap().firstEntry().getValue().get(0).getTimestamp();
+ s.setTimeRange(0,ts);
+ }
Region region = this.env.getRegion();
RegionScanner scanner = region.getScanner(s);
List<Cell> kvs = new ArrayList<Cell>(1);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
index 4efca9f..0f960e4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
@@ -77,7 +77,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
private IndexUpdate getIndexUpdateForGroup(ColumnGroup group, TableState state) {
List<CoveredColumn> refs = group.getColumns();
try {
- Pair<Scanner, IndexUpdate> stateInfo = ((LocalTableState)state).getIndexedColumnsTableState(refs);
+ Pair<Scanner, IndexUpdate> stateInfo = ((LocalTableState)state).getIndexedColumnsTableState(refs, false);
Scanner kvs = stateInfo.getFirst();
Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, kvs, state.getCurrentRowKey());
// make sure we close the scanner
@@ -132,7 +132,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
private IndexUpdate getDeleteForGroup(ColumnGroup group, TableState state) {
List<CoveredColumn> refs = group.getColumns();
try {
- Pair<Scanner, IndexUpdate> kvs = ((LocalTableState)state).getIndexedColumnsTableState(refs);
+ Pair<Scanner, IndexUpdate> kvs = ((LocalTableState)state).getIndexedColumnsTableState(refs, false);
Pair<Integer, List<ColumnEntry>> columns = getNextEntries(refs, kvs.getFirst(), state.getCurrentRowKey());
// make sure we close the scanner reference
kvs.getFirst().close();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
index e120268..b4282ab 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
@@ -59,7 +59,6 @@ public class ScannerBuilder {
public Scanner buildIndexedColumnScanner(Collection<? extends ColumnReference> indexedColumns, ColumnTracker tracker, long ts) {
- // TODO: This needs to use some form of the filter that Tephra has when transactional
Filter columnFilters = getColumnFilters(indexedColumns);
FilterList filters = new FilterList(Lists.newArrayList(columnFilters));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 4d545a2..13ad7e5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -89,14 +89,14 @@ import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TrustedByteArrayOutputStream;
-import co.cask.tephra.TxConstants;
-
import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import co.cask.tephra.TxConstants;
+
/**
*
* Class that builds index row key from data row key and current state of
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
index 7acc90c..8ad4d3e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java
@@ -15,8 +15,8 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Pair;
@@ -59,7 +59,8 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
@Override
public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData context) throws IOException {
- List<IndexMaintainer> indexMaintainers = ((PhoenixIndexMetaData)context).getIndexMaintainers();
+ PhoenixIndexMetaData metaData = (PhoenixIndexMetaData)context;
+ List<IndexMaintainer> indexMaintainers = metaData.getIndexMaintainers();
if (indexMaintainers.get(0).isRowDeleted(state.getPendingUpdate())) {
return Collections.emptyList();
}
@@ -67,7 +68,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
ptr.set(state.getCurrentRowKey());
List<IndexUpdate> indexUpdates = Lists.newArrayList();
for (IndexMaintainer maintainer : indexMaintainers) {
- Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns());
+ Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns(), metaData.ignoreNewerMutations());
ValueGetter valueGetter = statePair.getFirst();
IndexUpdate indexUpdate = statePair.getSecond();
indexUpdate.setTable(maintainer.getIndexTableName());
@@ -81,7 +82,8 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
@Override
public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) throws IOException {
- List<IndexMaintainer> indexMaintainers = ((PhoenixIndexMetaData)context).getIndexMaintainers();
+ PhoenixIndexMetaData metaData = (PhoenixIndexMetaData)context;
+ List<IndexMaintainer> indexMaintainers = metaData.getIndexMaintainers();
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
ptr.set(state.getCurrentRowKey());
List<IndexUpdate> indexUpdates = Lists.newArrayList();
@@ -90,7 +92,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec {
// to aid in rollback if there's a KeyValue column in the index. The alternative would be
// to hold on to all uncommitted index row keys (even ones already sent to HBase) on the
// client side.
- Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns());
+ Pair<ValueGetter, IndexUpdate> statePair = state.getIndexUpdateState(maintainer.getAllColumns(), metaData.ignoreNewerMutations());
ValueGetter valueGetter = statePair.getFirst();
IndexUpdate indexUpdate = statePair.getSecond();
indexUpdate.setTable(maintainer.getIndexTableName());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
index 60ae915..4fab674 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
@@ -22,8 +22,6 @@ import java.sql.SQLException;
import java.util.List;
import java.util.Map;
-import co.cask.tephra.Transaction;
-
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.cache.GlobalCache;
@@ -39,9 +37,12 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.ServerUtil;
+import co.cask.tephra.Transaction;
+
public class PhoenixIndexMetaData implements IndexMetaData {
private final Map<String, byte[]> attributes;
private final IndexMetaDataCache indexMetaDataCache;
+ private final boolean ignoreNewerMutations;
private static IndexMetaDataCache getIndexMetaData(RegionCoprocessorEnvironment env, Map<String, byte[]> attributes) throws IOException {
if (attributes == null) { return IndexMetaDataCache.EMPTY_INDEX_META_DATA_CACHE; }
@@ -87,6 +88,7 @@ public class PhoenixIndexMetaData implements IndexMetaData {
public PhoenixIndexMetaData(RegionCoprocessorEnvironment env, Map<String,byte[]> attributes) throws IOException {
this.indexMetaDataCache = getIndexMetaData(env, attributes);
this.attributes = attributes;
+ this.ignoreNewerMutations = attributes.get(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS) != null;
}
public Transaction getTransaction() {
@@ -100,4 +102,8 @@ public class PhoenixIndexMetaData implements IndexMetaData {
public Map<String, byte[]> getAttributes() {
return attributes;
}
+
+ public boolean ignoreNewerMutations() {
+ return ignoreNewerMutations;
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 26f9725..e4c106e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -486,7 +486,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
}
@Override
- public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns)
+ public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns, boolean ignoreNewerMutations)
throws IOException {
// TODO: creating these objects over and over again is wasteful
ColumnTracker tracker = new ColumnTracker(indexedColumns);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
index 22b02c5..2c33d21 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDriver.java
@@ -47,6 +47,7 @@ import org.apache.phoenix.query.HBaseFactoryProvider;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesImpl;
import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.PhoenixRuntime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -224,20 +225,23 @@ public final class PhoenixDriver extends PhoenixEmbeddedDriver {
connectionQueryServices = prevValue;
}
}
- boolean success = false;
- SQLException sqlE = null;
- try {
- connectionQueryServices.init(url, info);
- success = true;
- } catch (SQLException e) {
- sqlE = e;
- }
- finally {
- if (!success) {
- // Remove from map, as initialization failed
- connectionQueryServicesMap.remove(normalizedConnInfo);
- if (sqlE != null) {
- throw sqlE;
+ String noUpgradeProp = info.getProperty(PhoenixRuntime.NO_UPGRADE_ATTRIB);
+ if (!Boolean.TRUE.equals(noUpgradeProp)) {
+ boolean success = false;
+ SQLException sqlE = null;
+ try {
+ connectionQueryServices.init(url, info);
+ success = true;
+ } catch (SQLException e) {
+ sqlE = e;
+ }
+ finally {
+ if (!success) {
+ // Remove from map, as initialization failed
+ connectionQueryServicesMap.remove(normalizedConnInfo);
+ if (sqlE != null) {
+ throw sqlE;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/parse/NamedTableNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/NamedTableNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/NamedTableNode.java
index 4e0906f..d3b4505 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/NamedTableNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/NamedTableNode.java
@@ -39,6 +39,14 @@ public class NamedTableNode extends ConcreteTableNode {
return new NamedTableNode(alias, name, dynColumns);
}
+ public static NamedTableNode create (TableName name) {
+ return new NamedTableNode(null, name, Collections.<ColumnDef>emptyList());
+ }
+
+ public static NamedTableNode create (String schemaName, String tableName) {
+ return new NamedTableNode(null, TableName.create(schemaName, tableName), Collections.<ColumnDef>emptyList());
+ }
+
NamedTableNode(String alias, TableName name) {
super(alias, name);
dynColumns = Collections.<ColumnDef> emptyList();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 62297ee..27c5693 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -154,7 +154,7 @@ public class QueryServicesOptions {
public static final boolean DEFAULT_INDEX_FAILURE_HANDLING_REBUILD = true; // auto rebuild on
public static final boolean DEFAULT_INDEX_FAILURE_BLOCK_WRITE = false;
public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL = 10000; // 10 secs
- public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME = 300000; // 5 mins
+ public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME = 1; // 1 ms
/**
* HConstants#HIGH_QOS is the max we will see to a standard table. We go higher to differentiate
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 6409dcd..7f3f850 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -204,8 +204,6 @@ import org.apache.phoenix.util.UpgradeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import co.cask.tephra.TxConstants;
-
import com.google.common.base.Objects;
import com.google.common.collect.Iterators;
import com.google.common.collect.ListMultimap;
@@ -214,6 +212,8 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
+import co.cask.tephra.TxConstants;
+
public class MetaDataClient {
private static final Logger logger = LoggerFactory.getLogger(MetaDataClient.class);
@@ -1096,36 +1096,6 @@ public class MetaDataClient {
}
/**
- * Rebuild indexes from a timestamp which is the value from hbase row key timestamp field
- */
- public void buildPartialIndexFromTimeStamp(PTable index, TableRef dataTableRef, boolean blockWriteRebuildIndex) throws SQLException {
- boolean needRestoreIndexState = true;
- AlterIndexStatement indexStatement = null;
- if (!blockWriteRebuildIndex) {
- // Need to change index state from Disable to InActive when build index partially so that
- // new changes will be indexed during index rebuilding
- indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null,
- TableName.create(index.getSchemaName().getString(), index.getTableName().getString())),
- dataTableRef.getTable().getTableName().getString(), false, PIndexState.INACTIVE);
- alterIndex(indexStatement);
- }
- try {
- buildIndex(index, dataTableRef);
- needRestoreIndexState = false;
- } finally {
- if(needRestoreIndexState) {
- if (!blockWriteRebuildIndex) {
- // reset index state to disable
- indexStatement = FACTORY.alterIndex(FACTORY.namedTable(null,
- TableName.create(index.getSchemaName().getString(), index.getTableName().getString())),
- dataTableRef.getTable().getTableName().getString(), false, PIndexState.DISABLE);
- alterIndex(indexStatement);
- }
- }
- }
- }
-
- /**
* Create an index table by morphing the CreateIndexStatement into a CreateTableStatement and calling
* MetaDataClient.createTable. In doing so, we perform the following translations:
* 1) Change the type of any columns being indexed to types that support null if the column is nullable.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index bebdd8c..05ba6d2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -130,6 +130,11 @@ public class PhoenixRuntime {
public static final String TENANT_ID_ATTRIB = "TenantId";
/**
+ * Use this connection property to prevent an upgrade from occurring when
+ * connecting to a new server version.
+ */
+ public static final String NO_UPGRADE_ATTRIB = "NoUpgrade";
+ /**
* Use this connection property to control the number of rows that are
* batched together on an UPSERT INTO table1... SELECT ... FROM table2.
* It's only used when autoCommit is true and your source table is
@@ -163,7 +168,8 @@ public class PhoenixRuntime {
UPSERT_BATCH_SIZE_ATTRIB,
AUTO_COMMIT_ATTRIB,
CONSISTENCY_ATTRIB,
- REQUEST_METRIC_ATTRIB
+ REQUEST_METRIC_ATTRIB,
+ NO_UPGRADE_ATTRIB
};
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
index fa8bd85..a2e45af 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
@@ -91,7 +91,7 @@ public class TestLocalTableState {
ColumnReference col = new ColumnReference(fam, qual);
table.setCurrentTimestamp(ts);
//check that our value still shows up first on scan, even though this is a lazy load
- Pair<Scanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col));
+ Pair<Scanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col), false);
Scanner s = p.getFirst();
assertEquals("Didn't get the pending mutation's value first", m.get(fam, qual).get(0), s.next());
}
@@ -135,13 +135,13 @@ public class TestLocalTableState {
ColumnReference col = new ColumnReference(fam, qual);
table.setCurrentTimestamp(ts);
// check that the value is there
- Pair<Scanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col));
+ Pair<Scanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col), false);
Scanner s = p.getFirst();
assertEquals("Didn't get the pending mutation's value first", kv, s.next());
// rollback that value
table.rollback(Arrays.asList(kv));
- p = table.getIndexedColumnsTableState(Arrays.asList(col));
+ p = table.getIndexedColumnsTableState(Arrays.asList(col), false);
s = p.getFirst();
assertEquals("Didn't correctly rollback the row - still found it!", null, s.next());
Mockito.verify(env, Mockito.times(1)).getRegion();
@@ -179,14 +179,14 @@ public class TestLocalTableState {
ColumnReference col = new ColumnReference(fam, qual);
table.setCurrentTimestamp(ts);
// check that the value is there
- Pair<Scanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col));
+ Pair<Scanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col), false);
Scanner s = p.getFirst();
// make sure it read the table the one time
assertEquals("Didn't get the stored keyvalue!", storedKv, s.next());
// on the second lookup it shouldn't access the underlying table again - the cached columns
// should know they are done
- p = table.getIndexedColumnsTableState(Arrays.asList(col));
+ p = table.getIndexedColumnsTableState(Arrays.asList(col), false);
s = p.getFirst();
assertEquals("Lost already loaded update!", storedKv, s.next());
Mockito.verify(env, Mockito.times(1)).getRegion();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/046bda34/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
index fc3a976..b8fa72d 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/example/TestCoveredColumnIndexCodec.java
@@ -38,8 +38,8 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.hbase.index.covered.IndexMetaData;
import org.apache.phoenix.hbase.index.covered.IndexCodec;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
import org.apache.phoenix.hbase.index.covered.IndexUpdate;
import org.apache.phoenix.hbase.index.covered.LocalTableState;
import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
@@ -138,7 +138,7 @@ public class TestCoveredColumnIndexCodec {
}
@Override
- public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover)
+ public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover, boolean preMutationStateOnly)
throws IOException {
return r;
}