You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by mi...@apache.org on 2019/05/10 08:30:42 UTC

[phoenix] branch 4.14-HBase-1.2 updated: PHOENIX-5266 Client can only write on Index Table and skip data table if failure happens because of region split/move etc

This is an automated email from the ASF dual-hosted git repository.

mihir6692 pushed a commit to branch 4.14-HBase-1.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.14-HBase-1.2 by this push:
     new 11e8b70  PHOENIX-5266 Client can only write on Index Table and skip data table if failure happens because of region split/move etc
11e8b70 is described below

commit 11e8b703d9f9c9647e7b7c6c594a7546b980f32d
Author: Monani Mihir <mi...@apache.org>
AuthorDate: Fri May 10 14:00:04 2019 +0530

    PHOENIX-5266 Client can only write on Index Table and skip data table if failure happens because of region split/move etc
---
 .../apache/phoenix/end2end/MutationStateIT.java    | 242 +++++++++++++++++++++
 .../org/apache/phoenix/execute/MutationState.java  |  21 +-
 2 files changed, 257 insertions(+), 6 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
index 6030caa..7f4d9a4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
@@ -21,16 +21,30 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
+import java.math.BigInteger;
+import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.sql.Timestamp;
 import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.Repeat;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.apache.phoenix.util.TestUtil;
 import org.junit.Test;
 
 public class MutationStateIT extends ParallelStatsDisabledIT {
@@ -40,6 +54,8 @@ public class MutationStateIT extends ParallelStatsDisabledIT {
             + "ENTITY_ID CHAR(15) NOT NULL, TAGS VARCHAR, CONSTRAINT PAGE_SNAPSHOT_PK "
             + "PRIMARY KEY (ORGANIZATION_ID, ENTITY_ID DESC)) MULTI_TENANT=TRUE";
 
+    private static final Random RAND = new Random(5);
+
     private void upsertRows(PhoenixConnection conn, String fullTableName) throws SQLException {
         PreparedStatement stmt =
                 conn.prepareStatement("upsert into " + fullTableName
@@ -52,6 +68,232 @@ public class MutationStateIT extends ParallelStatsDisabledIT {
         }
     }
 
+    public static String randString(int length) {
+        return new BigInteger(164, RAND).toString().substring(0, length);
+    }
+
+    private static void mutateRandomly(final String upsertStmt, final String fullTableName,
+            final int nThreads, final int nRows, final int nIndexValues, final int batchSize,
+            final CountDownLatch doneSignal) {
+        Runnable[] runnables = new Runnable[nThreads];
+        for (int i = 0; i < nThreads; i++) {
+            runnables[i] = new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        Connection conn = DriverManager.getConnection(getUrl());
+                        for (int i = 0; i < nRows; i++) {
+                            PreparedStatement statement = conn.prepareStatement(upsertStmt);
+                            int index = 0;
+                            statement.setString(++index, randString(15));
+                            statement.setString(++index, randString(15));
+                            statement.setString(++index, randString(15));
+                            statement.setString(++index, randString(1));
+                            statement.setString(++index, randString(15));
+                            statement.setString(++index, randString(15));
+                            statement.setTimestamp(++index,
+                                new Timestamp(System.currentTimeMillis()));
+                            statement.setTimestamp(++index,
+                                new Timestamp(System.currentTimeMillis()));
+                            statement.setString(++index, randString(1));
+                            statement.setString(++index, randString(1));
+                            statement.setBoolean(++index, false);
+                            statement.setString(++index, randString(1));
+                            statement.setString(++index, randString(1));
+                            statement.setString(++index, randString(15));
+                            statement.setString(++index, randString(15));
+                            statement.setString(++index, randString(15));
+                            statement.setInt(++index, RAND.nextInt());
+                            statement.execute();
+                            if ((i % batchSize) == 0) {
+                                conn.commit();
+                            }
+                        }
+                        conn.commit();
+                    } catch (SQLException e) {
+                        throw new RuntimeException(e);
+                    } finally {
+                        doneSignal.countDown();
+                    }
+                }
+
+            };
+        }
+        for (int i = 0; i < nThreads; i++) {
+            Thread t = new Thread(runnables[i]);
+            t.start();
+        }
+    }
+
+    @Test
+    @Repeat(10)
+    public void testOnlyIndexTableWriteFromClientSide()
+            throws SQLException, InterruptedException, IOException {
+        String schemaName = generateUniqueName();
+        String tableName = generateUniqueName();
+        String indexName1 = generateUniqueName();
+        String indexName2 = generateUniqueName();
+        String indexName3 = generateUniqueName();
+        String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        String fullIndexName1 = SchemaUtil.getTableName(schemaName, indexName1);
+        String CREATE_DATA_TABLE =
+                "CREATE TABLE IF NOT EXISTS " + fullTableName + " ( \n"
+                        + "    USER1_ID CHAR(15) NOT NULL,\n"
+                        + "    ELEMENT1_ID CHAR(15) NOT NULL,\n"
+                        + "    ELEMENT_ID CHAR(15) NOT NULL,\n"
+                        + "    ELEMENT_TYPE VARCHAR(1) NOT NULL,\n"
+                        + "    TYPE_ID CHAR(15) NOT NULL,\n"
+                        + "    USER_ID CHAR(15) NOT NULL,\n"
+                        + "    ELEMENT4_TIME TIMESTAMP,\n"
+                        + "    ELEMENT_UPDATE TIMESTAMP,\n"
+                        + "    ELEMENT_SCORE DOUBLE,\n"
+                        + "    ELEMENT2_TYPE VARCHAR(1),\n"
+                        + "    ELEMENT1_TYPE VARCHAR(1),\n"
+                        + "    ELEMENT1_IS_SYS_GEN BOOLEAN,\n"
+                        + "    ELEMENT1_STATUS VARCHAR(1),\n"
+                        + "    ELEMENT1_VISIBILITY VARCHAR(1),\n"
+                        + "    ELEMENT3_ID CHAR(15),\n"
+                        + "    ELEMENT4_BY CHAR(15),\n"
+                        + "    BEST_ELEMENT_ID CHAR(15),\n"
+                        + "    ELEMENT_COUNT INTEGER,\n"
+                        + "    CONSTRAINT PK PRIMARY KEY\n"
+                        + "    (\n" + "     USER1_ID,\n"
+                        + "     ELEMENT1_ID,\n"
+                        + "     ELEMENT_ID,\n"
+                        + "     ELEMENT_TYPE,\n"
+                        + "     TYPE_ID,\n"
+                        + "     USER_ID\n" + " )\n"
+                        + " ) VERSIONS=1,MULTI_TENANT=TRUE,TTL=31536000\n";
+
+        String CREATE_INDEX_1 =
+                "CREATE INDEX IF NOT EXISTS " + indexName1 + " \n"
+                        + "     ON " + fullTableName + " (\n"
+                    + "     TYPE_ID,\n"
+                    + "     ELEMENT_ID,\n"
+                    + "     ELEMENT_TYPE,\n"
+                    + "     USER_ID,\n"
+                    + "     ELEMENT4_TIME DESC,\n"
+                    + "     ELEMENT1_ID DESC\n"
+                    + "     ) INCLUDE (\n"
+                    + "     ELEMENT2_TYPE,\n"
+                    + "     ELEMENT1_TYPE,\n"
+                    + "     ELEMENT1_IS_SYS_GEN,\n"
+                    + "     ELEMENT1_STATUS,\n"
+                    + "     ELEMENT1_VISIBILITY,\n"
+                    + "     ELEMENT3_ID,\n"
+                    + "     ELEMENT4_BY,\n"
+                    + "     BEST_ELEMENT_ID,\n"
+                    + "     ELEMENT_COUNT\n"
+                    + "     )\n";
+
+        String CREATE_INDEX_2 =
+                " CREATE INDEX IF NOT EXISTS " + indexName2  + "\n"
+                        + "     ON " + fullTableName + " (\n"
+                    + "     TYPE_ID,\n"
+                    + "     ELEMENT_ID,\n"
+                    + "     ELEMENT_TYPE,\n"
+                    + "     USER_ID,\n"
+                    + "     ELEMENT_UPDATE DESC,\n"
+                    + "     ELEMENT1_ID DESC\n"
+                    + "     ) INCLUDE (\n"
+                    + "     ELEMENT2_TYPE,\n"
+                    + "     ELEMENT1_TYPE,\n"
+                    + "     ELEMENT1_IS_SYS_GEN,\n"
+                    + "     ELEMENT1_STATUS,\n"
+                    + "     ELEMENT1_VISIBILITY,\n"
+                    + "     ELEMENT3_ID,\n"
+                    + "     ELEMENT4_BY,\n"
+                    + "     BEST_ELEMENT_ID,\n"
+                    + "     ELEMENT_COUNT\n"
+                    + "     )\n";
+
+        String CREATE_INDEX_3 =
+                "CREATE INDEX IF NOT EXISTS " + indexName3  + "\n"
+                        + "     ON " + fullTableName + " (\n"
+                    + "     TYPE_ID,\n"
+                    + "     ELEMENT_ID,\n"
+                    + "     ELEMENT_TYPE,\n"
+                    + "     USER_ID,\n"
+                    + "     ELEMENT_SCORE DESC,\n"
+                    + "     ELEMENT1_ID DESC\n"
+                    + "     ) INCLUDE (\n"
+                    + "     ELEMENT2_TYPE,\n"
+                    + "     ELEMENT1_TYPE,\n"
+                    + "     ELEMENT1_IS_SYS_GEN,\n"
+                    + "     ELEMENT1_STATUS,\n"
+                    + "     ELEMENT1_VISIBILITY,\n"
+                    + "     ELEMENT3_ID,\n"
+                    + "     ELEMENT4_BY,\n"
+                    + "     BEST_ELEMENT_ID,\n"
+                    + "     ELEMENT_COUNT\n"
+                    + "     )\n";
+
+        String UPSERT_INTO_DATA_TABLE =
+                "UPSERT INTO " + fullTableName + "\n"
+                        + "(\n" + "    USER1_ID,\n"
+                        + "    ELEMENT1_ID,\n"
+                        + "    ELEMENT_ID,\n"
+                        + "    ELEMENT_TYPE,\n"
+                        + "    TYPE_ID,\n"
+                        + "    USER_ID,\n"
+                        + "    ELEMENT4_TIME,\n"
+                        + "    ELEMENT_UPDATE,\n"
+                        + "    ELEMENT2_TYPE,\n"
+                        + "    ELEMENT1_TYPE,\n"
+                        + "    ELEMENT1_IS_SYS_GEN,\n"
+                        + "    ELEMENT1_STATUS,\n"
+                        + "    ELEMENT1_VISIBILITY,\n"
+                        + "    ELEMENT3_ID,\n"
+                        + "    ELEMENT4_BY,\n"
+                        + "    BEST_ELEMENT_ID,\n"
+                        + "    ELEMENT_COUNT\n" + ")"
+                      + "VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
+
+        int nThreads = 1;
+        int nRows = 5000;
+        int nIndexValues = 4000;
+        int batchSize = 200;
+        final CountDownLatch doneSignal = new CountDownLatch(nThreads);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            try {
+                conn.createStatement().execute(CREATE_DATA_TABLE);
+                conn.createStatement().execute(CREATE_INDEX_1);
+                conn.createStatement().execute(CREATE_INDEX_2);
+                conn.createStatement().execute(CREATE_INDEX_3);
+                conn.commit();
+                mutateRandomly(UPSERT_INTO_DATA_TABLE, fullTableName, nThreads, nRows, nIndexValues,
+                    batchSize, doneSignal);
+                Thread.sleep(200);
+                unassignRegionAsync(fullIndexName1);
+                assertTrue("Ran out of time", doneSignal.await(120, TimeUnit.SECONDS));
+            } finally {
+
+            }
+            long dataTableRows = TestUtil.getRowCount(conn, fullTableName);
+            ResultSet rs =
+                    conn.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), null,
+                        new String[] { PTableType.INDEX.toString() });
+            while (rs.next()) {
+                String indexState = rs.getString("INDEX_STATE");
+                String indexName = rs.getString(3);
+                long rowCountIndex =
+                        TestUtil.getRowCount(conn, SchemaUtil.getTableName(schemaName, indexName));
+                if (indexState.equals(PIndexState.ACTIVE.name())) {
+                    assertTrue(dataTableRows == rowCountIndex);
+                } else {
+                    assertTrue(dataTableRows > rowCountIndex);
+                }
+            }
+
+        } catch (InterruptedException e) {
+            throw e;
+        } catch (IOException e) {
+            throw e;
+        }
+    }
+
+
     @Test
     public void testDeleteMaxMutationSize() throws SQLException {
         String tableName = generateUniqueName();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 1cbc4bc..289adac 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -47,8 +47,6 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.htrace.Span;
 import org.apache.htrace.TraceScope;
-import org.apache.phoenix.cache.IndexMetaDataCache;
-import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.MutationPlan;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
@@ -935,6 +933,8 @@ public class MutationState implements SQLCloseable {
                 TableInfo tableInfo = pair.getKey();
                 byte[] htableName = tableInfo.getHTableName().getBytes();
                 List<Mutation> mutationList = pair.getValue();
+                List<List<Mutation>> mutationBatchList =
+                        getMutationBatchList(batchSize, batchSizeBytes, mutationList);
 
                 // create a span per target table
                 // TODO maybe we can be smarter about the table name to string here?
@@ -978,9 +978,9 @@ public class MutationState implements SQLCloseable {
 
                         startTime = System.currentTimeMillis();
                         child.addTimelineAnnotation("Attempt " + retryCount);
-                        List<List<Mutation>> mutationBatchList = getMutationBatchList(batchSize, batchSizeBytes,
-                                mutationList);
-                        for (final List<Mutation> mutationBatch : mutationBatchList) {
+                        Iterator<List<Mutation>> itrListMutation = mutationBatchList.iterator();
+                        while (itrListMutation.hasNext()) {
+                            final List<Mutation> mutationBatch = itrListMutation.next();
                             if (shouldRetryIndexedMutation) {
                                 // if there was an index write failure, retry the mutation in a loop
                                 final Table finalHTable = hTable;
@@ -1028,9 +1028,17 @@ public class MutationState implements SQLCloseable {
                                         return ioe;
                                     }
                                 }, iwe, connection, connection.getQueryServices().getProps());
+                                shouldRetryIndexedMutation = false;
                             } else {
                                 hTable.batch(mutationBatch);
                             }
+                            // remove each batch from the list once it gets applied
+                            // so when failures happens for any batch we only start
+                            // from that batch only instead of doing duplicate reply of already
+                            // applied batches from entire list, also we can set
+                            // REPLAY_ONLY_INDEX_WRITES for first batch
+                            // only in case of 1121 SQLException
+                            itrListMutation.remove();
 
                             batchCount++;
                             if (logger.isDebugEnabled())
@@ -1081,7 +1089,8 @@ public class MutationState implements SQLCloseable {
                                 if (iwe != null && !shouldRetryIndexedMutation) {
                                     // For an index write failure, the data table write succeeded,
                                     // so when we retry we need to set REPLAY_WRITES
-                                    for (Mutation m : mutationList) {
+                                    // for first batch in list only.
+                                    for (Mutation m : mutationBatchList.get(0)) {
                                         if (!PhoenixIndexMetaData.
                                                 isIndexRebuild(m.getAttributesMap())) {
                                             m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,