You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@phoenix.apache.org by GitBox <gi...@apache.org> on 2021/03/26 18:01:09 UTC

[GitHub] [phoenix] gjacoby126 commented on a change in pull request #1183: PHOENIX-6420 Wrong result when conditional and regular upserts are passed in the same commit batch

gjacoby126 commented on a change in pull request #1183:
URL: https://github.com/apache/phoenix/pull/1183#discussion_r601889798



##########
File path: phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
##########
@@ -17,23 +17,7 @@
  */
 package org.apache.phoenix.execute;
 
-import static org.apache.phoenix.execute.MutationState.joinSortedIntArrays;
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-
+import com.google.common.collect.ImmutableList;

Review comment:
       good to use the shaded version of guava for easier porting to 5.x

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
##########
@@ -404,67 +434,89 @@ public int getNumRows() {
         return numRows;
     }
 
+    private MultiRowMutationState getLastMutationBatch(Map<TableRef, List<MultiRowMutationState>> mutations, TableRef tableRef) {
+        List<MultiRowMutationState> mutationBatches = mutations.get(tableRef);
+        if (mutationBatches == null || mutationBatches.isEmpty()) {
+            return null;
+        }
+        return mutationBatches.get(mutationBatches.size() - 1);
+    }
+
     private void joinMutationState(TableRef tableRef, MultiRowMutationState srcRows,
-            Map<TableRef, MultiRowMutationState> dstMutations) {
+        Map<TableRef, List<MultiRowMutationState>> dstMutations) {
         PTable table = tableRef.getTable();
         boolean isIndex = table.getType() == PTableType.INDEX;
-        boolean incrementRowCount = dstMutations == this.mutations;
-        MultiRowMutationState existingRows = dstMutations.put(tableRef, srcRows);
-        if (existingRows != null) { // Rows for that table already exist
-            // Loop through new rows and replace existing with new
-            for (Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry : srcRows.entrySet()) {
-                // Replace existing row with new row
-                RowMutationState existingRowMutationState = existingRows.put(rowEntry.getKey(), rowEntry.getValue());
-                if (existingRowMutationState != null) {
-                    Map<PColumn, byte[]> existingValues = existingRowMutationState.getColumnValues();
-                    if (existingValues != PRow.DELETE_MARKER) {
-                        Map<PColumn, byte[]> newRow = rowEntry.getValue().getColumnValues();
-                        // if new row is PRow.DELETE_MARKER, it means delete, and we don't need to merge it with
-                        // existing row.
-                        if (newRow != PRow.DELETE_MARKER) {
-                            // decrement estimated size by the size of the old row
-                            estimatedSize -= existingRowMutationState.calculateEstimatedSize();
-                            // Merge existing column values with new column values
-                            existingRowMutationState.join(rowEntry.getValue());
-                            // increment estimated size by the size of the new row
-                            estimatedSize += existingRowMutationState.calculateEstimatedSize();
-                            // Now that the existing row has been merged with the new row, replace it back
-                            // again (since it was merged with the new one above).
-                            existingRows.put(rowEntry.getKey(), existingRowMutationState);
-                        }
-                    }
-                } else {
-                    if (incrementRowCount && !isIndex) { // Don't count index rows in row count
-                        numRows++;
-                        // increment estimated size by the size of the new row
-                        estimatedSize += rowEntry.getValue().calculateEstimatedSize();
-                    }
-                }
-            }
-            // Put the existing one back now that it's merged
-            dstMutations.put(tableRef, existingRows);
-        } else {
+        boolean incrementRowCount = dstMutations == this.mutationsMap;
+        // we only need to check if the new mutation batch (srcRows) conflicts with the
+        // last mutation batch
+        MultiRowMutationState existingRows = getLastMutationBatch(dstMutations, tableRef);
+
+        if (existingRows == null) { // no rows found for this table
             // Size new map at batch size as that's what it'll likely grow to.
             MultiRowMutationState newRows = new MultiRowMutationState(connection.getMutateBatchSize());
             newRows.putAll(srcRows);
-            dstMutations.put(tableRef, newRows);
+            addMutations(dstMutations, tableRef, newRows);
             if (incrementRowCount && !isIndex) {
                 numRows += srcRows.size();
                 // if we added all the rows from newMutationState we can just increment the
                 // estimatedSize by newMutationState.estimatedSize
                 estimatedSize += srcRows.estimatedSize;
             }
+            return;
+        }
+
+        // for conflicting rows
+        MultiRowMutationState conflictingRows = new MultiRowMutationState(connection.getMutateBatchSize());
+
+        // Rows for this table already exist, check for conflicts
+        for (Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry : srcRows.entrySet()) {
+            ImmutableBytesPtr key = rowEntry.getKey();
+            RowMutationState newRowMutationState = rowEntry.getValue();
+            RowMutationState existingRowMutationState = existingRows.get(key);
+            if (existingRowMutationState == null) {
+                existingRows.put(key, newRowMutationState);
+                if (incrementRowCount && !isIndex) { // Don't count index rows in row count
+                    numRows++;
+                    // increment estimated size by the size of the new row
+                    estimatedSize += newRowMutationState.calculateEstimatedSize();
+                }
+                continue;
+            }
+            Map<PColumn, byte[]> existingValues = existingRowMutationState.getColumnValues();
+            Map<PColumn, byte[]> newValues = newRowMutationState.getColumnValues();
+            if (existingValues != PRow.DELETE_MARKER && newValues != PRow.DELETE_MARKER) {
+                // Check if we can merge existing column values with new column values
+                long beforeMerge = existingRowMutationState.calculateEstimatedSize();

Review comment:
       nit: maybe beforeMergeSize? This reads like a boolean name to me. 

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
##########
@@ -586,10 +638,11 @@ public boolean hasNext() {
                     // the tables in the mutations map
                     if (!sendAll) {
                         TableRef key = new TableRef(index);
-                        MultiRowMutationState multiRowMutationState = mutations.remove(key);
+                        List<MultiRowMutationState> multiRowMutationState = mutationsMap.remove(key);
                         if (multiRowMutationState != null) {
                             final List<Mutation> deleteMutations = Lists.newArrayList();
-                            generateMutations(key, mutationTimestamp, serverTimestamp, multiRowMutationState, deleteMutations, null);
+                            // for index table there will only be 1 mutation batch in the list

Review comment:
       why only one mutation batch? What if there are conflicting batches for the index?

##########
File path: phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
##########
@@ -210,4 +216,63 @@ public void testPendingMutationsOnDDL() throws Exception {
                     + "( id1 UNSIGNED_INT not null primary key," + "appId1 VARCHAR)");
         }
     }
+
+    @Test
+    public void testOnDupAndUpsertInSameCommitBatch() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute(
+                "create table MUTATION_TEST1" +
+                    "( id1 UNSIGNED_INT not null primary key," +
+                    "appId1 VARCHAR)");
+            conn.createStatement().execute(
+                "create table MUTATION_TEST2" +
+                    "( id2 UNSIGNED_INT not null primary key," +
+                    "appId2 VARCHAR)");
+
+            conn.createStatement().execute("upsert into MUTATION_TEST1(id1,appId1) values(111,'app1')");
+            conn.createStatement().execute(
+                "upsert into MUTATION_TEST1(id1,appId1) values(111, 'app1') ON DUPLICATE KEY UPDATE appId1 = null");
+            conn.createStatement().execute("upsert into MUTATION_TEST2(id2,appId2) values(222,'app2')");
+            conn.createStatement().execute(
+                "upsert into MUTATION_TEST2(id2,appId2) values(222,'app2') ON DUPLICATE KEY UPDATE appId2 = null");
+
+            final PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+            MutationState state = pconn.getMutationState();
+            assertEquals(2, state.getNumRows());
+
+            int actualPairs = 0;
+            Iterator<Pair<byte[], List<Mutation>>> mutations = state.toMutations();

Review comment:
       A comment here on what you're checking would be helpful. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org