You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/09/20 18:05:45 UTC
[22/47] phoenix git commit: PHOENIX-3237 Automatic rebuild of
disabled index will fail if indexes of two tables are disabled at the same
time
PHOENIX-3237 Automatic rebuild of disabled index will fail if indexes of two tables are disabled at the same time
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b640b39c
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b640b39c
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b640b39c
Branch: refs/heads/calcite
Commit: b640b39ce8cbd36fab33c79ea81ed500dd882f99
Parents: 850b02c
Author: Ankit Singhal <an...@gmail.com>
Authored: Wed Sep 14 13:40:49 2016 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Wed Sep 14 13:40:49 2016 +0530
----------------------------------------------------------------------
.../end2end/index/MutableIndexFailureIT.java | 239 +++++++++++--------
.../coprocessor/MetaDataRegionObserver.java | 6 +-
2 files changed, 143 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b640b39c/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 5d0230b..0a85216 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -49,7 +49,6 @@ import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTableType;
-import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
@@ -128,6 +127,9 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
}
public void helpTestWriteFailureDisablesIndex() throws Exception {
+ String secondTableName = fullTableName + "_2";
+ String secondIndexName = indexName + "_2";
+ String secondFullIndexName = fullIndexName + "_2";
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
props.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, String.valueOf(isNamespaceMapped));
try (Connection conn = driver.connect(url, props)) {
@@ -139,6 +141,8 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
}
conn.createStatement().execute("CREATE TABLE " + fullTableName
+ " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + tableDDLOptions);
+ conn.createStatement().execute("CREATE TABLE " + secondTableName
+ + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + tableDDLOptions);
query = "SELECT * FROM " + fullTableName;
rs = conn.createStatement().executeQuery(query);
assertFalse(rs.next());
@@ -146,34 +150,26 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
FAIL_WRITE = false;
conn.createStatement().execute(
"CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
+ conn.createStatement().execute(
+ "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + secondIndexName + " ON " + secondTableName + " (v1) INCLUDE (v2)");
query = "SELECT * FROM " + fullIndexName;
rs = conn.createStatement().executeQuery(query);
assertFalse(rs.next());
// Verify the metadata for index is correct.
- rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), indexName,
+ rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), indexName+"%",
new String[] { PTableType.INDEX.toString() });
assertTrue(rs.next());
assertEquals(indexName, rs.getString(3));
assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
+ assertTrue(rs.next());
+ assertEquals(secondIndexName, rs.getString(3));
+ assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
assertFalse(rs.next());
-
- PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
- stmt.setString(1, "a");
- stmt.setString(2, "x");
- stmt.setString(3, "1");
- stmt.execute();
- stmt.setString(1, "b");
- stmt.setString(2, "y");
- stmt.setString(3, "2");
- stmt.execute();
- stmt.setString(1, "c");
- stmt.setString(2, "z");
- stmt.setString(3, "3");
- stmt.execute();
- conn.commit();
-
+ initializeTable(conn, fullTableName);
+ initializeTable(conn, secondTableName);
+
query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
rs = conn.createStatement().executeQuery("EXPLAIN " + query);
String expectedPlan = "CLIENT PARALLEL 1-WAY FULL SCAN OVER "
@@ -192,31 +188,8 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
assertFalse(rs.next());
FAIL_WRITE = true;
-
- stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
- // Insert new row
- stmt.setString(1, "d");
- stmt.setString(2, "d");
- stmt.setString(3, "4");
- stmt.execute();
- // Update existing row
- stmt.setString(1, "a");
- stmt.setString(2, "x2");
- stmt.setString(3, "2");
- stmt.execute();
- // Delete existing row
- stmt = conn.prepareStatement("DELETE FROM " + fullTableName + " WHERE k=?");
- stmt.setString(1, "b");
- stmt.execute();
- try {
- conn.commit();
- fail();
- } catch (SQLException e) {
- System.out.println();
- } catch(Exception e) {
- System.out.println();
- }
-
+ updateTable(conn, fullTableName);
+ updateTable(conn, secondTableName);
// Verify the metadata for index is correct.
rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), indexName,
new String[] { PTableType.INDEX.toString() });
@@ -236,13 +209,18 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
// would have succeeded while the index writes would have failed.
if (!transactional) {
// Verify UPSERT on data table still work after index is disabled
- stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+ stmt.setString(1, "a3");
+ stmt.setString(2, "x3");
+ stmt.setString(3, "3");
+ stmt.execute();
+ conn.commit();
+ stmt = conn.prepareStatement("UPSERT INTO " + secondTableName + " VALUES(?,?,?)");
stmt.setString(1, "a3");
stmt.setString(2, "x3");
stmt.setString(3, "3");
stmt.execute();
conn.commit();
-
// Verify previous writes succeeded to data table
query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
rs = conn.createStatement().executeQuery("EXPLAIN " + query);
@@ -267,25 +245,18 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
// re-enable index table
FAIL_WRITE = false;
-
- boolean isActive = false;
- if (!transactional) {
- int maxTries = 3, nTries = 0;
- do {
- Thread.sleep(15 * 1000); // sleep 15 secs
- rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), indexName,
- new String[] { PTableType.INDEX.toString() });
- assertTrue(rs.next());
- if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
- isActive = true;
- break;
- }
- } while(++nTries < maxTries);
- assertTrue(isActive);
- }
+ waitForIndexToBeActive(conn,indexName);
+ waitForIndexToBeActive(conn,secondIndexName);
// Verify UPSERT on data table still work after index table is recreated
- stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+ stmt.setString(1, "a3");
+ stmt.setString(2, "x4");
+ stmt.setString(3, "4");
+ stmt.execute();
+ conn.commit();
+
+ stmt = conn.prepareStatement("UPSERT INTO " + secondTableName + " VALUES(?,?,?)");
stmt.setString(1, "a3");
stmt.setString(2, "x4");
stmt.setString(3, "4");
@@ -293,48 +264,116 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
conn.commit();
// verify index table has correct data
- query = "SELECT /*+ INDEX(" + indexName + ") */ k,v1 FROM " + fullTableName;
- rs = conn.createStatement().executeQuery("EXPLAIN " + query);
- expectedPlan = " OVER "
- + (localIndex
- ? Bytes.toString(SchemaUtil
- .getPhysicalTableName(fullTableName.getBytes(), isNamespaceMapped).getName())
- : SchemaUtil.getPhysicalTableName(fullIndexName.getBytes(), isNamespaceMapped).getNameAsString());
- String explainPlan = QueryUtil.getExplainPlan(rs);
- assertTrue(explainPlan.contains(expectedPlan));
- rs = conn.createStatement().executeQuery(query);
- if (transactional) { // failed commit does not get retried
- assertTrue(rs.next());
- assertEquals("a", rs.getString(1));
- assertEquals("x", rs.getString(2));
- assertTrue(rs.next());
- assertEquals("a3", rs.getString(1));
- assertEquals("x4", rs.getString(2));
- assertTrue(rs.next());
- assertEquals("b", rs.getString(1));
- assertEquals("y", rs.getString(2));
- assertTrue(rs.next());
- assertEquals("c", rs.getString(1));
- assertEquals("z", rs.getString(2));
- assertFalse(rs.next());
- } else { // failed commit eventually succeeds
- assertTrue(rs.next());
- assertEquals("d", rs.getString(1));
- assertEquals("d", rs.getString(2));
- assertTrue(rs.next());
- assertEquals("a", rs.getString(1));
- assertEquals("x2", rs.getString(2));
- assertTrue(rs.next());
- assertEquals("a3", rs.getString(1));
- assertEquals("x4", rs.getString(2));
+ validateDataWithIndex(conn, fullTableName, fullIndexName);
+ validateDataWithIndex(conn, secondTableName, secondFullIndexName);
+ }
+ }
+
+ private void waitForIndexToBeActive(Connection conn, String index) throws InterruptedException, SQLException {
+ boolean isActive = false;
+ if (!transactional) {
+ int maxTries = 4, nTries = 0;
+ do {
+ Thread.sleep(15 * 1000); // sleep 15 secs
+ ResultSet rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), index,
+ new String[] { PTableType.INDEX.toString() });
assertTrue(rs.next());
- assertEquals("c", rs.getString(1));
- assertEquals("z", rs.getString(2));
- assertFalse(rs.next());
- }
+ if (PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))) {
+ isActive = true;
+ break;
+ }
+ } while (++nTries < maxTries);
+ assertTrue(isActive);
+ }
+ }
+
+ private void initializeTable(Connection conn, String tableName) throws SQLException {
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)");
+ stmt.setString(1, "a");
+ stmt.setString(2, "x");
+ stmt.setString(3, "1");
+ stmt.execute();
+ stmt.setString(1, "b");
+ stmt.setString(2, "y");
+ stmt.setString(3, "2");
+ stmt.execute();
+ stmt.setString(1, "c");
+ stmt.setString(2, "z");
+ stmt.setString(3, "3");
+ stmt.execute();
+ conn.commit();
+
+ }
+
+ private void validateDataWithIndex(Connection conn, String tableName, String indexName) throws SQLException {
+ String query = "SELECT /*+ INDEX(" + indexName + ") */ k,v1 FROM " + tableName;
+ ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+ String expectedPlan = " OVER "
+ + (localIndex
+ ? Bytes.toString(
+ SchemaUtil.getPhysicalTableName(tableName.getBytes(), isNamespaceMapped).getName())
+ : SchemaUtil.getPhysicalTableName(indexName.getBytes(), isNamespaceMapped).getNameAsString());
+ String explainPlan = QueryUtil.getExplainPlan(rs);
+ assertTrue(explainPlan.contains(expectedPlan));
+ rs = conn.createStatement().executeQuery(query);
+ if (transactional) { // failed commit does not get retried
+ assertTrue(rs.next());
+ assertEquals("a", rs.getString(1));
+ assertEquals("x", rs.getString(2));
+ assertTrue(rs.next());
+ assertEquals("a3", rs.getString(1));
+ assertEquals("x4", rs.getString(2));
+ assertTrue(rs.next());
+ assertEquals("b", rs.getString(1));
+ assertEquals("y", rs.getString(2));
+ assertTrue(rs.next());
+ assertEquals("c", rs.getString(1));
+ assertEquals("z", rs.getString(2));
+ assertFalse(rs.next());
+ } else { // failed commit eventually succeeds
+ assertTrue(rs.next());
+ assertEquals("d", rs.getString(1));
+ assertEquals("d", rs.getString(2));
+ assertTrue(rs.next());
+ assertEquals("a", rs.getString(1));
+ assertEquals("x2", rs.getString(2));
+ assertTrue(rs.next());
+ assertEquals("a3", rs.getString(1));
+ assertEquals("x4", rs.getString(2));
+ assertTrue(rs.next());
+ assertEquals("c", rs.getString(1));
+ assertEquals("z", rs.getString(2));
+ assertFalse(rs.next());
}
}
+ private void updateTable(Connection conn, String tableName) throws SQLException {
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)");
+ // Insert new row
+ stmt.setString(1, "d");
+ stmt.setString(2, "d");
+ stmt.setString(3, "4");
+ stmt.execute();
+ // Update existing row
+ stmt.setString(1, "a");
+ stmt.setString(2, "x2");
+ stmt.setString(3, "2");
+ stmt.execute();
+ // Delete existing row
+ stmt = conn.prepareStatement("DELETE FROM " + tableName + " WHERE k=?");
+ stmt.setString(1, "b");
+ stmt.execute();
+ try {
+ conn.commit();
+ fail();
+ } catch (SQLException e) {
+ System.out.println();
+ } catch (Exception e) {
+ System.out.println();
+ }
+
+ }
+
public static class FailingRegionObserver extends SimpleRegionObserver {
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/b640b39c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index 5beba49..f1dc982 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -257,9 +257,11 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES);
byte[] indexStat = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
- if ((dataTable == null || dataTable.length == 0)
- || (indexStat == null || indexStat.length == 0)) {
+ if ((dataTable == null || dataTable.length == 0) || (indexStat == null || indexStat.length == 0)
+ || (dataPTable != null
+ && Bytes.compareTo(dataPTable.getName().getBytes(), dataTable) != 0)) {
// data table name can't be empty
+ // we need to build indexes of same data table. so skip other indexes for this task.
continue;
}