You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/09/20 18:05:45 UTC

[22/47] phoenix git commit: PHOENIX-3237 Automatic rebuild of disabled index will fail if indexes of two tables are disabled at the same time

PHOENIX-3237 Automatic rebuild of disabled index will fail if indexes of two tables are disabled at the same time


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b640b39c
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b640b39c
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b640b39c

Branch: refs/heads/calcite
Commit: b640b39ce8cbd36fab33c79ea81ed500dd882f99
Parents: 850b02c
Author: Ankit Singhal <an...@gmail.com>
Authored: Wed Sep 14 13:40:49 2016 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Wed Sep 14 13:40:49 2016 +0530

----------------------------------------------------------------------
 .../end2end/index/MutableIndexFailureIT.java    | 239 +++++++++++--------
 .../coprocessor/MetaDataRegionObserver.java     |   6 +-
 2 files changed, 143 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/b640b39c/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 5d0230b..0a85216 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -49,7 +49,6 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTableType;
-import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
@@ -128,6 +127,9 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
     }
 
     public void helpTestWriteFailureDisablesIndex() throws Exception {
+        String secondTableName = fullTableName + "_2";
+        String secondIndexName = indexName + "_2";
+        String secondFullIndexName = fullIndexName + "_2";
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         props.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, String.valueOf(isNamespaceMapped));
         try (Connection conn = driver.connect(url, props)) {
@@ -139,6 +141,8 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
             }
             conn.createStatement().execute("CREATE TABLE " + fullTableName
                     + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + tableDDLOptions);
+            conn.createStatement().execute("CREATE TABLE " + secondTableName
+                    + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + tableDDLOptions);
             query = "SELECT * FROM " + fullTableName;
             rs = conn.createStatement().executeQuery(query);
             assertFalse(rs.next());
@@ -146,34 +150,26 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
             FAIL_WRITE = false;
             conn.createStatement().execute(
                     "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
+            conn.createStatement().execute(
+                    "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + secondIndexName + " ON " + secondTableName + " (v1) INCLUDE (v2)");
 
             query = "SELECT * FROM " + fullIndexName;
             rs = conn.createStatement().executeQuery(query);
             assertFalse(rs.next());
 
             // Verify the metadata for index is correct.
-            rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), indexName,
+            rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), indexName+"%",
                     new String[] { PTableType.INDEX.toString() });
             assertTrue(rs.next());
             assertEquals(indexName, rs.getString(3));
             assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
+            assertTrue(rs.next());
+            assertEquals(secondIndexName, rs.getString(3));
+            assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
             assertFalse(rs.next());
-
-            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
-            stmt.setString(1, "a");
-            stmt.setString(2, "x");
-            stmt.setString(3, "1");
-            stmt.execute();
-            stmt.setString(1, "b");
-            stmt.setString(2, "y");
-            stmt.setString(3, "2");
-            stmt.execute();
-            stmt.setString(1, "c");
-            stmt.setString(2, "z");
-            stmt.setString(3, "3");
-            stmt.execute();
-            conn.commit();
-
+            initializeTable(conn, fullTableName);
+            initializeTable(conn, secondTableName);
+            
             query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
             rs = conn.createStatement().executeQuery("EXPLAIN " + query);
             String expectedPlan = "CLIENT PARALLEL 1-WAY FULL SCAN OVER "
@@ -192,31 +188,8 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
             assertFalse(rs.next());
 
             FAIL_WRITE = true;
-
-            stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
-            // Insert new row
-            stmt.setString(1, "d");
-            stmt.setString(2, "d");
-            stmt.setString(3, "4");
-            stmt.execute();
-            // Update existing row
-            stmt.setString(1, "a");
-            stmt.setString(2, "x2");
-            stmt.setString(3, "2");
-            stmt.execute();
-            // Delete existing row
-            stmt = conn.prepareStatement("DELETE FROM " + fullTableName + " WHERE k=?");
-            stmt.setString(1, "b");
-            stmt.execute();
-            try {
-                conn.commit();
-                fail();
-            } catch (SQLException e) {
-                System.out.println();
-            }  catch(Exception e) {
-                System.out.println();
-            }
-
+            updateTable(conn, fullTableName);
+            updateTable(conn, secondTableName);
             // Verify the metadata for index is correct.
             rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), indexName,
                     new String[] { PTableType.INDEX.toString() });
@@ -236,13 +209,18 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
             // would have succeeded while the index writes would have failed.
             if (!transactional) {
                 // Verify UPSERT on data table still work after index is disabled
-                stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+                PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+                stmt.setString(1, "a3");
+                stmt.setString(2, "x3");
+                stmt.setString(3, "3");
+                stmt.execute();
+                conn.commit();
+                stmt = conn.prepareStatement("UPSERT INTO " + secondTableName + " VALUES(?,?,?)");
                 stmt.setString(1, "a3");
                 stmt.setString(2, "x3");
                 stmt.setString(3, "3");
                 stmt.execute();
                 conn.commit();
-
                 // Verify previous writes succeeded to data table
                 query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
                 rs = conn.createStatement().executeQuery("EXPLAIN " + query);
@@ -267,25 +245,18 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
 
             // re-enable index table
             FAIL_WRITE = false;
-            
-            boolean isActive = false;
-            if (!transactional) {
-                int maxTries = 3, nTries = 0;
-                do {
-                    Thread.sleep(15 * 1000); // sleep 15 secs
-                    rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), indexName,
-                            new String[] { PTableType.INDEX.toString() });
-                    assertTrue(rs.next());
-                    if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
-                        isActive = true;
-                        break;
-                    }
-                } while(++nTries < maxTries);
-                assertTrue(isActive);
-            }
+            waitForIndexToBeActive(conn,indexName);
+            waitForIndexToBeActive(conn,secondIndexName);
 
             // Verify UPSERT on data table still work after index table is recreated
-            stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+            stmt.setString(1, "a3");
+            stmt.setString(2, "x4");
+            stmt.setString(3, "4");
+            stmt.execute();
+            conn.commit();
+            
+            stmt = conn.prepareStatement("UPSERT INTO " + secondTableName + " VALUES(?,?,?)");
             stmt.setString(1, "a3");
             stmt.setString(2, "x4");
             stmt.setString(3, "4");
@@ -293,48 +264,116 @@ public class MutableIndexFailureIT extends BaseOwnClusterHBaseManagedTimeIT {
             conn.commit();
 
             // verify index table has correct data
-            query = "SELECT /*+ INDEX(" + indexName + ") */ k,v1 FROM " + fullTableName;
-            rs = conn.createStatement().executeQuery("EXPLAIN " + query);
-            expectedPlan = " OVER "
-                    + (localIndex
-                            ? Bytes.toString(SchemaUtil
-                                    .getPhysicalTableName(fullTableName.getBytes(), isNamespaceMapped).getName())
-                            : SchemaUtil.getPhysicalTableName(fullIndexName.getBytes(), isNamespaceMapped).getNameAsString());
-            String explainPlan = QueryUtil.getExplainPlan(rs);
-            assertTrue(explainPlan.contains(expectedPlan));
-            rs = conn.createStatement().executeQuery(query);
-            if (transactional) { // failed commit does not get retried
-                assertTrue(rs.next());
-                assertEquals("a", rs.getString(1));
-                assertEquals("x", rs.getString(2));
-                assertTrue(rs.next());
-                assertEquals("a3", rs.getString(1));
-                assertEquals("x4", rs.getString(2));
-                assertTrue(rs.next());
-                assertEquals("b", rs.getString(1));
-                assertEquals("y", rs.getString(2));
-                assertTrue(rs.next());
-                assertEquals("c", rs.getString(1));
-                assertEquals("z", rs.getString(2));
-                assertFalse(rs.next());
-            } else { // failed commit eventually succeeds
-                assertTrue(rs.next());
-                assertEquals("d", rs.getString(1));
-                assertEquals("d", rs.getString(2));
-                assertTrue(rs.next());
-                assertEquals("a", rs.getString(1));
-                assertEquals("x2", rs.getString(2));
-                assertTrue(rs.next());
-                assertEquals("a3", rs.getString(1));
-                assertEquals("x4", rs.getString(2));
+            validateDataWithIndex(conn, fullTableName, fullIndexName);
+            validateDataWithIndex(conn, secondTableName, secondFullIndexName);
+        }
+    }
+
+    private void waitForIndexToBeActive(Connection conn, String index) throws InterruptedException, SQLException {
+        boolean isActive = false;
+        if (!transactional) {
+            int maxTries = 4, nTries = 0;
+            do {
+                Thread.sleep(15 * 1000); // sleep 15 secs
+                ResultSet rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(schema), index,
+                        new String[] { PTableType.INDEX.toString() });
                 assertTrue(rs.next());
-                assertEquals("c", rs.getString(1));
-                assertEquals("z", rs.getString(2));
-                assertFalse(rs.next());
-            }
+                if (PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))) {
+                    isActive = true;
+                    break;
+                }
+            } while (++nTries < maxTries);
+            assertTrue(isActive);
+        }
+    }
+
+    private void initializeTable(Connection conn, String tableName) throws SQLException {
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)");
+        stmt.setString(1, "a");
+        stmt.setString(2, "x");
+        stmt.setString(3, "1");
+        stmt.execute();
+        stmt.setString(1, "b");
+        stmt.setString(2, "y");
+        stmt.setString(3, "2");
+        stmt.execute();
+        stmt.setString(1, "c");
+        stmt.setString(2, "z");
+        stmt.setString(3, "3");
+        stmt.execute();
+        conn.commit();
+
+    }
+
+    private void validateDataWithIndex(Connection conn, String tableName, String indexName) throws SQLException {
+        String query = "SELECT /*+ INDEX(" + indexName + ") */ k,v1 FROM " + tableName;
+        ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+        String expectedPlan = " OVER "
+                + (localIndex
+                        ? Bytes.toString(
+                                SchemaUtil.getPhysicalTableName(tableName.getBytes(), isNamespaceMapped).getName())
+                        : SchemaUtil.getPhysicalTableName(indexName.getBytes(), isNamespaceMapped).getNameAsString());
+        String explainPlan = QueryUtil.getExplainPlan(rs);
+        assertTrue(explainPlan.contains(expectedPlan));
+        rs = conn.createStatement().executeQuery(query);
+        if (transactional) { // failed commit does not get retried
+            assertTrue(rs.next());
+            assertEquals("a", rs.getString(1));
+            assertEquals("x", rs.getString(2));
+            assertTrue(rs.next());
+            assertEquals("a3", rs.getString(1));
+            assertEquals("x4", rs.getString(2));
+            assertTrue(rs.next());
+            assertEquals("b", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertTrue(rs.next());
+            assertEquals("c", rs.getString(1));
+            assertEquals("z", rs.getString(2));
+            assertFalse(rs.next());
+        } else { // failed commit eventually succeeds
+            assertTrue(rs.next());
+            assertEquals("d", rs.getString(1));
+            assertEquals("d", rs.getString(2));
+            assertTrue(rs.next());
+            assertEquals("a", rs.getString(1));
+            assertEquals("x2", rs.getString(2));
+            assertTrue(rs.next());
+            assertEquals("a3", rs.getString(1));
+            assertEquals("x4", rs.getString(2));
+            assertTrue(rs.next());
+            assertEquals("c", rs.getString(1));
+            assertEquals("z", rs.getString(2));
+            assertFalse(rs.next());
         }
     }
     
+    private void updateTable(Connection conn, String tableName) throws SQLException {
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)");
+        // Insert new row
+        stmt.setString(1, "d");
+        stmt.setString(2, "d");
+        stmt.setString(3, "4");
+        stmt.execute();
+        // Update existing row
+        stmt.setString(1, "a");
+        stmt.setString(2, "x2");
+        stmt.setString(3, "2");
+        stmt.execute();
+        // Delete existing row
+        stmt = conn.prepareStatement("DELETE FROM " + tableName + " WHERE k=?");
+        stmt.setString(1, "b");
+        stmt.execute();
+        try {
+            conn.commit();
+            fail();
+        } catch (SQLException e) {
+            System.out.println();
+        } catch (Exception e) {
+            System.out.println();
+        }
+
+    }
+
     public static class FailingRegionObserver extends SimpleRegionObserver {
         @Override
         public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b640b39c/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index 5beba49..f1dc982 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -257,9 +257,11 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
                         PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES);
                     byte[] indexStat = r.getValue(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
                         PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
-                    if ((dataTable == null || dataTable.length == 0)
-                            || (indexStat == null || indexStat.length == 0)) {
+                    if ((dataTable == null || dataTable.length == 0) || (indexStat == null || indexStat.length == 0)
+                            || (dataPTable != null
+                                    && Bytes.compareTo(dataPTable.getName().getBytes(), dataTable) != 0)) {
                         // data table name can't be empty
+                        // we need to build indexes of same data table. so skip other indexes for this task.
                         continue;
                     }