You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by mi...@apache.org on 2019/05/11 05:48:52 UTC

[phoenix] branch 4.14-HBase-1.3 updated: PHOENIX-5055 Split mutations batches probably affects correctness of index data

This is an automated email from the ASF dual-hosted git repository.

mihir6692 pushed a commit to branch 4.14-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.14-HBase-1.3 by this push:
     new 6d5bca2  PHOENIX-5055 Split mutations batches probably affects correctness of index data
6d5bca2 is described below

commit 6d5bca224c23bea9c6025d6526a2046411f47439
Author: Jaanai <cl...@gmail.com>
AuthorDate: Sat May 11 11:18:41 2019 +0530

    PHOENIX-5055 Split mutations batches probably affects correctness of index data
---
 .../apache/phoenix/end2end/MutationStateIT.java    | 47 +++++++++++++++++++++-
 .../org/apache/phoenix/end2end/QueryMoreIT.java    |  6 +--
 .../org/apache/phoenix/execute/MutationState.java  | 41 ++++++++++++++-----
 .../apache/phoenix/execute/MutationStateTest.java  | 41 +++++++++++++++++++
 4 files changed, 122 insertions(+), 13 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
index 7f4d9a4..3f3c314 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java
@@ -26,6 +26,7 @@ import java.math.BigInteger;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
+import java.util.Iterator;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -35,6 +36,11 @@ import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -447,5 +453,44 @@ public class MutationStateIT extends ParallelStatsDisabledIT {
         stmt.execute();
         assertTrue("Mutation state size should decrease", prevEstimatedSize+4 > state.getEstimatedSize());
     }
-    
+
+    @Test
+    public void testSplitMutationsIntoSameGroupForSingleRow() throws Exception {
+        String tableName = "TBL_" + generateUniqueName();
+        String indexName = "IDX_" + generateUniqueName();
+        Properties props = new Properties();
+        props.put("phoenix.mutate.batchSize", "2");
+        try (PhoenixConnection conn = DriverManager.getConnection(getUrl(), props).unwrap(PhoenixConnection.class)) {
+            conn.setAutoCommit(false);
+            conn.createStatement().executeUpdate(
+                    "CREATE TABLE "  + tableName + " ("
+                            + "A VARCHAR NOT NULL PRIMARY KEY,"
+                            + "B VARCHAR,"
+                            + "C VARCHAR,"
+                            + "D VARCHAR) COLUMN_ENCODED_BYTES = 0");
+            conn.createStatement().executeUpdate("CREATE INDEX " + indexName + " on "  + tableName + " (C) INCLUDE(D)");
+
+            conn.createStatement().executeUpdate("UPSERT INTO "  + tableName + "(A,B,C,D) VALUES ('A2','B2','C2','D2')");
+            conn.createStatement().executeUpdate("UPSERT INTO "  + tableName + "(A,B,C,D) VALUES ('A3','B3', 'C3', null)");
+            conn.commit();
+
+            Table htable = conn.getQueryServices().getTable(Bytes.toBytes(tableName));
+            Scan scan = new Scan();
+            scan.setRaw(true);
+            Iterator<Result> scannerIter = htable.getScanner(scan).iterator();
+            while (scannerIter.hasNext()) {
+                long ts = -1;
+                Result r = scannerIter.next();
+                for (Cell cell : r.listCells()) {
+                    if (ts == -1) {
+                        ts = cell.getTimestamp();
+                    } else {
+                        assertEquals("(" + cell.toString() + ") has different ts", ts, cell.getTimestamp());
+                    }
+                }
+            }
+            htable.close();
+        }
+    }
+
 }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
index 04272fa..6c8064d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java
@@ -490,14 +490,14 @@ public class QueryMoreIT extends ParallelStatsDisabledIT {
         connection.commit();
         assertEquals(2L, connection.getMutationState().getBatchCount());
         
-        // set the batch size (rows) to 1 
-        connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, "1");
+        // set the batch size (rows) to 2 since three are at least 2 mutations when updates a single row
+        connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, "2");
         connectionProperties.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, "128");
         connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties);
         upsertRows(connection, fullTableName);
         connection.commit();
         // each row should be in its own batch
-        assertEquals(4L, connection.getMutationState().getBatchCount());
+        assertEquals(2L, connection.getMutationState().getBatchCount());
     }
     
     private void upsertRows(PhoenixConnection conn, String fullTableName) throws SQLException {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 289adac..e54e892 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -102,6 +102,7 @@ import org.apache.phoenix.util.TransactionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
@@ -1137,34 +1138,56 @@ public class MutationState implements SQLCloseable {
     }
 
     /**
-     * Split the list of mutations into multiple lists that don't exceed row and byte thresholds
+     *
+     * Split the list of mutations into multiple lists. since a single row update can contain multiple mutations,
+     * we only check if the current batch has exceeded the row or size limit for different rows,
+     * so that mutations for a single row don't end up in different batches.
      * 
      * @param allMutationList
      *            List of HBase mutations
      * @return List of lists of mutations
      */
-    public static List<List<Mutation>> getMutationBatchList(long batchSize, long batchSizeBytes,
-            List<Mutation> allMutationList) {
+    public static List<List<Mutation>> getMutationBatchList(long batchSize, long batchSizeBytes, List<Mutation> allMutationList) {
+        Preconditions.checkArgument(batchSize> 1,
+                "Mutation types are put or delete, for one row all mutations must be in one batch.");
+        Preconditions.checkArgument(batchSizeBytes > 0, "Batch size must be larger than 0");
         List<List<Mutation>> mutationBatchList = Lists.newArrayList();
         List<Mutation> currentList = Lists.newArrayList();
+        List<Mutation> sameRowList = Lists.newArrayList();
         long currentBatchSizeBytes = 0L;
-        for (Mutation mutation : allMutationList) {
-            long mutationSizeBytes = KeyValueUtil.calculateMutationDiskSize(mutation);
-            if (currentList.size() == batchSize || currentBatchSizeBytes + mutationSizeBytes > batchSizeBytes) {
+        for (int i = 0; i < allMutationList.size(); ) {
+            long sameRowBatchSize = 1L;
+            Mutation mutation = allMutationList.get(i);
+            long sameRowMutationSizeBytes = KeyValueUtil.calculateMutationDiskSize(mutation);
+            sameRowList.add(mutation);
+            while (i + 1 < allMutationList.size() &&
+                    Bytes.compareTo(allMutationList.get(i + 1).getRow(), mutation.getRow()) == 0) {
+                Mutation sameRowMutation = allMutationList.get(i + 1);
+                sameRowList.add(sameRowMutation);
+                sameRowMutationSizeBytes += KeyValueUtil.calculateMutationDiskSize(sameRowMutation);
+                sameRowBatchSize++;
+                i++;
+            }
+
+            if (currentList.size() + sameRowBatchSize > batchSize ||
+                    currentBatchSizeBytes + sameRowMutationSizeBytes > batchSizeBytes) {
                 if (currentList.size() > 0) {
                     mutationBatchList.add(currentList);
                     currentList = Lists.newArrayList();
                     currentBatchSizeBytes = 0L;
                 }
             }
-            currentList.add(mutation);
-            currentBatchSizeBytes += mutationSizeBytes;
+
+            currentList.addAll(sameRowList);
+            currentBatchSizeBytes += sameRowMutationSizeBytes;
+            sameRowList.clear();
+            i++;
         }
+
         if (currentList.size() > 0) {
             mutationBatchList.add(currentList);
         }
         return mutationBatchList;
-
     }
 
     public byte[] encodeTransaction() throws SQLException {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
index 8553b73..22662b2 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
@@ -29,6 +29,9 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.schema.types.PUnsignedInt;
@@ -36,6 +39,8 @@ import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.junit.Test;
 
+import com.google.common.collect.ImmutableList;
+
 public class MutationStateTest {
 
     @Test
@@ -134,4 +139,40 @@ public class MutationStateTest {
         assertTrue("app2".equals(PVarchar.INSTANCE.toObject(CellUtil.cloneValue(keyValues2.get(1)))));
 
     }
+
+    @Test
+    public void testGetMutationBatchList() {
+        byte[] r1 = Bytes.toBytes(1);
+        byte[] r2 = Bytes.toBytes(2);
+        byte[] r3 = Bytes.toBytes(3);
+        byte[] r4 = Bytes.toBytes(4);
+        // one put and one delete as a group
+        {
+            List<Mutation> list = ImmutableList.of(new Put(r1), new Put(r2), new Delete(r2));
+            List<List<Mutation>> batchLists = MutationState.getMutationBatchList(2, 10, list);
+            assertTrue(batchLists.size() == 2);
+            assertEquals(batchLists.get(0).size(), 1);
+            assertEquals(batchLists.get(1).size(), 2);
+        }
+
+        {
+            List<Mutation> list = ImmutableList.of(new Put(r1), new Delete(r1), new Put(r2));
+            List<List<Mutation>> batchLists = MutationState.getMutationBatchList(2, 10, list);
+            assertTrue(batchLists.size() == 2);
+            assertEquals(batchLists.get(0).size(), 2);
+            assertEquals(batchLists.get(1).size(), 1);
+        }
+
+        {
+            List<Mutation> list = ImmutableList.of(new Put(r3), new Put(r1), new Delete(r1), new Put(r2), new Put(r4), new Delete(r4));
+            List<List<Mutation>> batchLists = MutationState.getMutationBatchList(2, 10, list);
+            assertTrue(batchLists.size() == 4);
+            assertEquals(batchLists.get(0).size(), 1);
+            assertEquals(batchLists.get(1).size(), 2);
+            assertEquals(batchLists.get(2).size(), 1);
+            assertEquals(batchLists.get(3).size(), 2);
+        }
+
+    }
+
 }