You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by mi...@apache.org on 2019/05/11 05:48:52 UTC
[phoenix] branch 4.14-HBase-1.3 updated: PHOENIX-5055 Split
mutations batches probably affects correctness of index data
This is an automated email from the ASF dual-hosted git repository.
mihir6692 pushed a commit to branch 4.14-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.14-HBase-1.3 by this push:
new 6d5bca2 PHOENIX-5055 Split mutations batches probably affects correctness of index data
6d5bca2 is described below
commit 6d5bca224c23bea9c6025d6526a2046411f47439
Author: Jaanai <cl...@gmail.com>
AuthorDate: Sat May 11 11:18:41 2019 +0530
PHOENIX-5055 Split mutations batches probably affects correctness of index data
---
.../apache/phoenix/end2end/MutationStateIT.java | 47 +++++++++++++++++++++-
.../org/apache/phoenix/end2end/QueryMoreIT.java | 6 +--
.../org/apache/phoenix/execute/MutationState.java | 41 ++++++++++++++-----
.../apache/phoenix/execute/MutationStateTest.java | 41 +++++++++++++++++++
4 files changed, 122 insertions(+), 13 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
index 7f4d9a4..3f3c314 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
@@ -26,6 +26,7 @@ import java.math.BigInteger;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
+import java.util.Iterator;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -35,6 +36,11 @@ import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -447,5 +453,44 @@ public class MutationStateIT extends ParallelStatsDisabledIT {
stmt.execute();
assertTrue("Mutation state size should decrease", prevEstimatedSize+4 > state.getEstimatedSize());
}
-
+
+ @Test
+ public void testSplitMutationsIntoSameGroupForSingleRow() throws Exception {
+ String tableName = "TBL_" + generateUniqueName();
+ String indexName = "IDX_" + generateUniqueName();
+ Properties props = new Properties();
+ props.put("phoenix.mutate.batchSize", "2");
+ try (PhoenixConnection conn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class)) {
+ conn.setAutoCommit(false);
+ conn.createStatement().executeUpdate(
+ "CREATE TABLE " + tableName + " ("
+ + "A VARCHAR NOT NULL PRIMARY KEY,"
+ + "B VARCHAR,"
+ + "C VARCHAR,"
+ + "D VARCHAR) COLUMN_ENCODED_BYTES = 0");
+ conn.createStatement().executeUpdate("CREATE INDEX " + indexName + " on " + tableName + " (C) INCLUDE(D)");
+
+ conn.createStatement().executeUpdate("UPSERT INTO " + tableName + "(A,B,C,D) VALUES ('A2','B2','C2','D2')");
+ conn.createStatement().executeUpdate("UPSERT INTO " + tableName + "(A,B,C,D) VALUES ('A3','B3', 'C3', null)");
+ conn.commit();
+
+ Table htable = conn.getQueryServices().getTable(Bytes.toBytes(tableName));
+ Scan scan = new Scan();
+ scan.setRaw(true);
+ Iterator<Result> scannerIter = htable.getScanner(scan).iterator();
+ while (scannerIter.hasNext()) {
+ long ts = -1;
+ Result r = scannerIter.next();
+ for (Cell cell : r.listCells()) {
+ if (ts == -1) {
+ ts = cell.getTimestamp();
+ } else {
+ assertEquals("(" + cell.toString() + ") has different ts", ts, cell.getTimestamp());
+ }
+ }
+ }
+ htable.close();
+ }
+ }
+
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
index 04272fa..6c8064d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
@@ -490,14 +490,14 @@ public class QueryMoreIT extends ParallelStatsDisabledIT {
connection.commit();
assertEquals(2L, connection.getMutationState().getBatchCount());
- // set the batch size (rows) to 1
- connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, "1");
+ // set the batch size (rows) to 2 since three are at least 2 mutations when updates a single row
+ connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, "2");
connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, "128");
connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties);
upsertRows(connection, fullTableName);
connection.commit();
// each row should be in its own batch
- assertEquals(4L, connection.getMutationState().getBatchCount());
+ assertEquals(2L, connection.getMutationState().getBatchCount());
}
private void upsertRows(PhoenixConnection conn, String fullTableName) throws SQLException {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 289adac..e54e892 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -102,6 +102,7 @@ import org.apache.phoenix.util.TransactionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
@@ -1137,34 +1138,56 @@ public class MutationState implements SQLCloseable {
}
/**
- * Split the list of mutations into multiple lists that don't exceed row and byte thresholds
+ *
+ * Split the list of mutations into multiple lists. since a single row update can contain multiple mutations,
+ * we only check if the current batch has exceeded the row or size limit for different rows,
+ * so that mutations for a single row don't end up in different batches.
*
* @param allMutationList
* List of HBase mutations
* @return List of lists of mutations
*/
- public static List<List<Mutation>> getMutationBatchList(long batchSize, long batchSizeBytes,
- List<Mutation> allMutationList) {
+ public static List<List<Mutation>> getMutationBatchList(long batchSize, long batchSizeBytes, List<Mutation> allMutationList) {
+ Preconditions.checkArgument(batchSize> 1,
+ "Mutation types are put or delete, for one row all mutations must be in one batch.");
+ Preconditions.checkArgument(batchSizeBytes > 0, "Batch size must be larger than 0");
List<List<Mutation>> mutationBatchList = Lists.newArrayList();
List<Mutation> currentList = Lists.newArrayList();
+ List<Mutation> sameRowList = Lists.newArrayList();
long currentBatchSizeBytes = 0L;
- for (Mutation mutation : allMutationList) {
- long mutationSizeBytes = KeyValueUtil.calculateMutationDiskSize(mutation);
- if (currentList.size() == batchSize || currentBatchSizeBytes + mutationSizeBytes > batchSizeBytes) {
+ for (int i = 0; i < allMutationList.size(); ) {
+ long sameRowBatchSize = 1L;
+ Mutation mutation = allMutationList.get(i);
+ long sameRowMutationSizeBytes = KeyValueUtil.calculateMutationDiskSize(mutation);
+ sameRowList.add(mutation);
+ while (i + 1 < allMutationList.size() &&
+ Bytes.compareTo(allMutationList.get(i + 1).getRow(), mutation.getRow()) == 0) {
+ Mutation sameRowMutation = allMutationList.get(i + 1);
+ sameRowList.add(sameRowMutation);
+ sameRowMutationSizeBytes += KeyValueUtil.calculateMutationDiskSize(sameRowMutation);
+ sameRowBatchSize++;
+ i++;
+ }
+
+ if (currentList.size() + sameRowBatchSize > batchSize ||
+ currentBatchSizeBytes + sameRowMutationSizeBytes > batchSizeBytes) {
if (currentList.size() > 0) {
mutationBatchList.add(currentList);
currentList = Lists.newArrayList();
currentBatchSizeBytes = 0L;
}
}
- currentList.add(mutation);
- currentBatchSizeBytes += mutationSizeBytes;
+
+ currentList.addAll(sameRowList);
+ currentBatchSizeBytes += sameRowMutationSizeBytes;
+ sameRowList.clear();
+ i++;
}
+
if (currentList.size() > 0) {
mutationBatchList.add(currentList);
}
return mutationBatchList;
-
}
public byte[] encodeTransaction() throws SQLException {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
index 8553b73..22662b2 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
@@ -29,6 +29,9 @@ import java.util.List;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.schema.types.PUnsignedInt;
@@ -36,6 +39,8 @@ import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.util.PhoenixRuntime;
import org.junit.Test;
+import com.google.common.collect.ImmutableList;
+
public class MutationStateTest {
@Test
@@ -134,4 +139,40 @@ public class MutationStateTest {
assertTrue("app2".equals(PVarchar.INSTANCE.toObject(CellUtil.cloneValue(keyValues2.get(1)))));
}
+
+ @Test
+ public void testGetMutationBatchList() {
+ byte[] r1 = Bytes.toBytes(1);
+ byte[] r2 = Bytes.toBytes(2);
+ byte[] r3 = Bytes.toBytes(3);
+ byte[] r4 = Bytes.toBytes(4);
+ // one put and one delete as a group
+ {
+ List<Mutation> list = ImmutableList.of(new Put(r1), new Put(r2), new Delete(r2));
+ List<List<Mutation>> batchLists = MutationState.getMutationBatchList(2, 10, list);
+ assertTrue(batchLists.size() == 2);
+ assertEquals(batchLists.get(0).size(), 1);
+ assertEquals(batchLists.get(1).size(), 2);
+ }
+
+ {
+ List<Mutation> list = ImmutableList.of(new Put(r1), new Delete(r1), new Put(r2));
+ List<List<Mutation>> batchLists = MutationState.getMutationBatchList(2, 10, list);
+ assertTrue(batchLists.size() == 2);
+ assertEquals(batchLists.get(0).size(), 2);
+ assertEquals(batchLists.get(1).size(), 1);
+ }
+
+ {
+ List<Mutation> list = ImmutableList.of(new Put(r3), new Put(r1), new Delete(r1), new Put(r2), new Put(r4), new Delete(r4));
+ List<List<Mutation>> batchLists = MutationState.getMutationBatchList(2, 10, list);
+ assertTrue(batchLists.size() == 4);
+ assertEquals(batchLists.get(0).size(), 1);
+ assertEquals(batchLists.get(1).size(), 2);
+ assertEquals(batchLists.get(2).size(), 1);
+ assertEquals(batchLists.get(3).size(), 2);
+ }
+
+ }
+
}