You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by mi...@apache.org on 2019/05/10 06:20:58 UTC
[phoenix] branch 4.x-HBase-1.3 updated: PHOENIX-5266 Client can
only write on Index Table and skip data table if failure happens because of
region split/move etc
This is an automated email from the ASF dual-hosted git repository.
mihir6692 pushed a commit to branch 4.x-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.3 by this push:
new 21ea99e PHOENIX-5266 Client can only write on Index Table and skip data table if failure happens because of region split/move etc
21ea99e is described below
commit 21ea99ea22fc3e760d47d5e78f525505e7c08abb
Author: Monani Mihir <mm...@salesforce.com>
AuthorDate: Fri May 10 11:50:38 2019 +0530
PHOENIX-5266 Client can only write on Index Table and skip data table if failure happens because of region split/move etc
---
.../apache/phoenix/end2end/MutationStateIT.java | 244 +++++++++++++++++++++
.../org/apache/phoenix/execute/MutationState.java | 19 +-
2 files changed, 259 insertions(+), 4 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
index fbcf320..c71e2e8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
@@ -21,12 +21,20 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.sql.Timestamp;
import java.util.Iterator;
import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Result;
@@ -37,8 +45,17 @@ import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.Repeat;
+import org.apache.phoenix.util.RunUntilFailure;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.apache.phoenix.util.TestUtil;
import org.junit.Test;
+import org.junit.runner.RunWith;
+@RunWith(RunUntilFailure.class)
public class MutationStateIT extends ParallelStatsDisabledIT {
private static final String DDL =
@@ -46,6 +63,8 @@ public class MutationStateIT extends ParallelStatsDisabledIT {
+ "ENTITY_ID CHAR(15) NOT NULL, TAGS VARCHAR, CONSTRAINT PAGE_SNAPSHOT_PK "
+ "PRIMARY KEY (ORGANIZATION_ID, ENTITY_ID DESC)) MULTI_TENANT=TRUE";
+ private static final Random RAND = new Random(5);
+
private void upsertRows(PhoenixConnection conn, String fullTableName) throws SQLException {
PreparedStatement stmt =
conn.prepareStatement("upsert into " + fullTableName
@@ -58,6 +77,231 @@ public class MutationStateIT extends ParallelStatsDisabledIT {
}
}
+
+ public static String randString(int length) {
+ return new BigInteger(164, RAND).toString().substring(0, length);
+ }
+
+ private static void mutateRandomly(final String upsertStmt, final String fullTableName,
+ final int nThreads, final int nRows, final int nIndexValues, final int batchSize,
+ final CountDownLatch doneSignal) {
+ Runnable[] runnables = new Runnable[nThreads];
+ for (int i = 0; i < nThreads; i++) {
+ runnables[i] = new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ Connection conn = DriverManager.getConnection(getUrl());
+ for (int i = 0; i < nRows; i++) {
+ PreparedStatement statement = conn.prepareStatement(upsertStmt);
+ int index = 0;
+ statement.setString(++index, randString(15));
+ statement.setString(++index, randString(15));
+ statement.setString(++index, randString(15));
+ statement.setString(++index, randString(1));
+ statement.setString(++index, randString(15));
+ statement.setString(++index, randString(15));
+ statement.setTimestamp(++index,
+ new Timestamp(System.currentTimeMillis()));
+ statement.setTimestamp(++index,
+ new Timestamp(System.currentTimeMillis()));
+ statement.setString(++index, randString(1));
+ statement.setString(++index, randString(1));
+ statement.setBoolean(++index, false);
+ statement.setString(++index, randString(1));
+ statement.setString(++index, randString(1));
+ statement.setString(++index, randString(15));
+ statement.setString(++index, randString(15));
+ statement.setString(++index, randString(15));
+ statement.setInt(++index, RAND.nextInt());
+ statement.execute();
+ if ((i % batchSize) == 0) {
+ conn.commit();
+ }
+ }
+ conn.commit();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ } finally {
+ doneSignal.countDown();
+ }
+ }
+
+ };
+ }
+ for (int i = 0; i < nThreads; i++) {
+ Thread t = new Thread(runnables[i]);
+ t.start();
+ }
+ }
+
+ @Test
+ @Repeat(5)
+ public void testOnlyIndexTableWriteFromClientSide()
+ throws SQLException, InterruptedException, IOException {
+ String schemaName = generateUniqueName();
+ String tableName = generateUniqueName();
+ String indexName1 = generateUniqueName();
+ String indexName2 = generateUniqueName();
+ String indexName3 = generateUniqueName();
+ String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+ String fullIndexName1 = SchemaUtil.getTableName(schemaName, indexName1);
+ String CREATE_DATA_TABLE =
+ "CREATE TABLE IF NOT EXISTS " + fullTableName + " ( \n"
+ + " USER1_ID CHAR(15) NOT NULL,\n"
+ + " ELEMENT1_ID CHAR(15) NOT NULL,\n"
+ + " ELEMENT_ID CHAR(15) NOT NULL,\n"
+ + " ELEMENT_TYPE VARCHAR(1) NOT NULL,\n"
+ + " TYPE_ID CHAR(15) NOT NULL,\n"
+ + " USER_ID CHAR(15) NOT NULL,\n"
+ + " ELEMENT4_TIME TIMESTAMP,\n"
+ + " ELEMENT_UPDATE TIMESTAMP,\n"
+ + " ELEMENT_SCORE DOUBLE,\n"
+ + " ELEMENT2_TYPE VARCHAR(1),\n"
+ + " ELEMENT1_TYPE VARCHAR(1),\n"
+ + " ELEMENT1_IS_SYS_GEN BOOLEAN,\n"
+ + " ELEMENT1_STATUS VARCHAR(1),\n"
+ + " ELEMENT1_VISIBILITY VARCHAR(1),\n"
+ + " ELEMENT3_ID CHAR(15),\n"
+ + " ELEMENT4_BY CHAR(15),\n"
+ + " BEST_ELEMENT_ID CHAR(15),\n"
+ + " ELEMENT_COUNT INTEGER,\n"
+ + " CONSTRAINT PK PRIMARY KEY\n"
+ + " (\n" + " USER1_ID,\n"
+ + " ELEMENT1_ID,\n"
+ + " ELEMENT_ID,\n"
+ + " ELEMENT_TYPE,\n"
+ + " TYPE_ID,\n"
+ + " USER_ID\n" + " )\n"
+ + " ) VERSIONS=1,MULTI_TENANT=TRUE,TTL=31536000\n";
+
+ String CREATE_INDEX_1 =
+ "CREATE INDEX IF NOT EXISTS " + indexName1 + " \n"
+ + " ON " + fullTableName + " (\n"
+ + " TYPE_ID,\n"
+ + " ELEMENT_ID,\n"
+ + " ELEMENT_TYPE,\n"
+ + " USER_ID,\n"
+ + " ELEMENT4_TIME DESC,\n"
+ + " ELEMENT1_ID DESC\n"
+ + " ) INCLUDE (\n"
+ + " ELEMENT2_TYPE,\n"
+ + " ELEMENT1_TYPE,\n"
+ + " ELEMENT1_IS_SYS_GEN,\n"
+ + " ELEMENT1_STATUS,\n"
+ + " ELEMENT1_VISIBILITY,\n"
+ + " ELEMENT3_ID,\n"
+ + " ELEMENT4_BY,\n"
+ + " BEST_ELEMENT_ID,\n"
+ + " ELEMENT_COUNT\n"
+ + " )\n";
+
+ String CREATE_INDEX_2 =
+ " CREATE INDEX IF NOT EXISTS " + indexName2 + "\n"
+ + " ON " + fullTableName + " (\n"
+ + " TYPE_ID,\n"
+ + " ELEMENT_ID,\n"
+ + " ELEMENT_TYPE,\n"
+ + " USER_ID,\n"
+ + " ELEMENT_UPDATE DESC,\n"
+ + " ELEMENT1_ID DESC\n"
+ + " ) INCLUDE (\n"
+ + " ELEMENT2_TYPE,\n"
+ + " ELEMENT1_TYPE,\n"
+ + " ELEMENT1_IS_SYS_GEN,\n"
+ + " ELEMENT1_STATUS,\n"
+ + " ELEMENT1_VISIBILITY,\n"
+ + " ELEMENT3_ID,\n"
+ + " ELEMENT4_BY,\n"
+ + " BEST_ELEMENT_ID,\n"
+ + " ELEMENT_COUNT\n"
+ + " )\n";
+
+ String CREATE_INDEX_3 =
+ "CREATE INDEX IF NOT EXISTS " + indexName3 + "\n"
+ + " ON " + fullTableName + " (\n"
+ + " TYPE_ID,\n"
+ + " ELEMENT_ID,\n"
+ + " ELEMENT_TYPE,\n"
+ + " USER_ID,\n"
+ + " ELEMENT_SCORE DESC,\n"
+ + " ELEMENT1_ID DESC\n"
+ + " ) INCLUDE (\n"
+ + " ELEMENT2_TYPE,\n"
+ + " ELEMENT1_TYPE,\n"
+ + " ELEMENT1_IS_SYS_GEN,\n"
+ + " ELEMENT1_STATUS,\n"
+ + " ELEMENT1_VISIBILITY,\n"
+ + " ELEMENT3_ID,\n"
+ + " ELEMENT4_BY,\n"
+ + " BEST_ELEMENT_ID,\n"
+ + " ELEMENT_COUNT\n"
+ + " )\n";
+
+ String UPSERT_INTO_DATA_TABLE =
+ "UPSERT INTO " + fullTableName + "\n"
+ + "(\n" + " USER1_ID,\n"
+ + " ELEMENT1_ID,\n"
+ + " ELEMENT_ID,\n"
+ + " ELEMENT_TYPE,\n"
+ + " TYPE_ID,\n"
+ + " USER_ID,\n"
+ + " ELEMENT4_TIME,\n"
+ + " ELEMENT_UPDATE,\n"
+ + " ELEMENT2_TYPE,\n"
+ + " ELEMENT1_TYPE,\n"
+ + " ELEMENT1_IS_SYS_GEN,\n"
+ + " ELEMENT1_STATUS,\n"
+ + " ELEMENT1_VISIBILITY,\n"
+ + " ELEMENT3_ID,\n"
+ + " ELEMENT4_BY,\n"
+ + " BEST_ELEMENT_ID,\n"
+ + " ELEMENT_COUNT\n" + ")"
+ + "VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
+
+ int nThreads = 1;
+ int nRows = 5000;
+ int nIndexValues = 4000;
+ int batchSize = 200;
+ final CountDownLatch doneSignal = new CountDownLatch(nThreads);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ try {
+ conn.createStatement().execute(CREATE_DATA_TABLE);
+ conn.createStatement().execute(CREATE_INDEX_1);
+ conn.createStatement().execute(CREATE_INDEX_2);
+ conn.createStatement().execute(CREATE_INDEX_3);
+ conn.commit();
+ mutateRandomly(UPSERT_INTO_DATA_TABLE, fullTableName, nThreads, nRows, nIndexValues,
+ batchSize, doneSignal);
+ Thread.sleep(200);
+ unassignRegionAsync(fullIndexName1);
+ assertTrue("Ran out of time", doneSignal.await(120, TimeUnit.SECONDS));
+ } finally {
+
+ }
+ long dataTableRows = TestUtil.getRowCount(conn, fullTableName);
+ ResultSet rs =
+ conn.getMetaData().getTables(null, StringUtil.escapeLike(schemaName), null,
+ new String[] { PTableType.INDEX.toString() });
+ while (rs.next()) {
+ String indexState = rs.getString("INDEX_STATE");
+ String indexName = rs.getString(3);
+ long rowCountIndex =
+ TestUtil.getRowCount(conn, SchemaUtil.getTableName(schemaName, indexName));
+ if (indexState.equals(PIndexState.ACTIVE.name())) {
+ assertTrue(dataTableRows == rowCountIndex);
+ } else {
+ assertTrue(dataTableRows > rowCountIndex);
+ }
+ }
+ } catch (InterruptedException e) {
+ throw e;
+ } catch (IOException e) {
+ throw e;
+ }
+ }
+
@Test
public void testDeleteMaxMutationSize() throws SQLException {
String tableName = generateUniqueName();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 36d120a..eef36fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -940,6 +940,8 @@ public class MutationState implements SQLCloseable {
TableInfo tableInfo = pair.getKey();
byte[] htableName = tableInfo.getHTableName().getBytes();
List<Mutation> mutationList = pair.getValue();
+ List<List<Mutation>> mutationBatchList =
+ getMutationBatchList(batchSize, batchSizeBytes, mutationList);
// create a span per target table
// TODO maybe we can be smarter about the table name to string here?
@@ -988,9 +990,9 @@ public class MutationState implements SQLCloseable {
startTime = System.currentTimeMillis();
child.addTimelineAnnotation("Attempt " + retryCount);
- List<List<Mutation>> mutationBatchList = getMutationBatchList(batchSize, batchSizeBytes,
- mutationList);
- for (final List<Mutation> mutationBatch : mutationBatchList) {
+ Iterator<List<Mutation>> itrListMutation = mutationBatchList.iterator();
+ while (itrListMutation.hasNext()) {
+ final List<Mutation> mutationBatch = itrListMutation.next();
if (shouldRetryIndexedMutation) {
// if there was an index write failure, retry the mutation in a loop
final Table finalHTable = hTable;
@@ -1038,9 +1040,17 @@ public class MutationState implements SQLCloseable {
return ioe;
}
}, iwe, connection, connection.getQueryServices().getProps());
+ shouldRetryIndexedMutation = false;
} else {
hTable.batch(mutationBatch);
}
+ // remove each batch from the list once it gets applied
+ // so when failures happens for any batch we only start
+ // from that batch only instead of doing duplicate reply of already
+ // applied batches from entire list, also we can set
+ // REPLAY_ONLY_INDEX_WRITES for first batch
+ // only in case of 1121 SQLException
+ itrListMutation.remove();
batchCount++;
if (logger.isDebugEnabled())
@@ -1091,7 +1101,8 @@ public class MutationState implements SQLCloseable {
if (iwe != null && !shouldRetryIndexedMutation) {
// For an index write failure, the data table write succeeded,
// so when we retry we need to set REPLAY_WRITES
- for (Mutation m : mutationList) {
+ // for first batch in list only.
+ for (Mutation m : mutationBatchList.get(0)) {
if (!PhoenixIndexMetaData.
isIndexRebuild(m.getAttributesMap())) {
m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,