You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2019/10/25 20:38:40 UTC

[phoenix] branch 4.x-HBase-1.5 updated: PHOENIX-5539 Full row index write at the last write phase for mutable global indexes

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.x-HBase-1.5
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-HBase-1.5 by this push:
     new e73eb14  PHOENIX-5539 Full row index write at the last write phase for mutable global indexes
e73eb14 is described below

commit e73eb147ab24d078e1baf5c045428838f7175855
Author: Kadir <ko...@salesforce.com>
AuthorDate: Thu Oct 24 01:10:32 2019 -0700

    PHOENIX-5539 Full row index write at the last write phase for mutable global indexes
---
 .../end2end/index/GlobalIndexCheckerIT.java        | 50 ++++++++++++++++
 .../phoenix/hbase/index/IndexRegionObserver.java   | 68 ++++++++++++----------
 2 files changed, 87 insertions(+), 31 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index 63c6e75..7c823ea 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.end2end.index;
 
+import static org.apache.phoenix.end2end.index.ImmutableIndexIT.verifyRowsForEmptyColValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -33,11 +34,19 @@ import java.util.Map;
 import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
 import org.apache.phoenix.end2end.IndexToolIT;
 import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
 import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -257,6 +266,47 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT {
         }
     }
 
+    public static void checkUnverifiedCellCount(Connection conn, String indexTableName) throws Exception {
+        Table hIndexTable = conn.unwrap(PhoenixConnection.class).getQueryServices()
+                .getTable(Bytes.toBytes(indexTableName));
+        long indexCnt = TestUtil.getRowCount(hIndexTable, false);
+        assertEquals(1, indexCnt);
+        assertEquals(true, verifyRowsForEmptyColValue(conn, indexTableName, IndexRegionObserver.UNVERIFIED_BYTES));
+        Scan s = new Scan();
+        int cntCellValues = 0;
+        try (ResultScanner scanner = hIndexTable.getScanner(s)) {
+            Result result;
+            while ((result = scanner.next()) != null) {
+                CellScanner cellScanner = result.cellScanner();
+                while (cellScanner.advance()) {
+                    cntCellValues++;
+                }
+            }
+        }
+        assertEquals(1, cntCellValues);
+    }
+    @Test
+    public void testUnverifiedRowIncludesOnlyEmptyCell() throws Exception {
+        String dataTableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("create table " + dataTableName +
+                    " (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10), val3 varchar(10))" + tableDDLOptions);
+            String indexTableName = generateUniqueName();
+            conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " +
+                    dataTableName + " (val1) include (val2, val3)");
+            // Configure IndexRegionObserver to fail the last write phase (i.e., the post index update phase)
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+            conn.createStatement().execute("upsert into " + dataTableName + " (id, val2) values ('a', 'abcc')");
+            conn.commit();
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+            // check that in the first phase we don't send the full row.
+            // We count the num of cells for this
+            checkUnverifiedCellCount(conn, indexTableName);
+            // Add rows and check everything is still okay
+            verifyTableHealth(conn, dataTableName, indexTableName);
+        }
+    }
+
     @Test
     public void testOnePhaseOverwiteFollowingTwoPhaseWrite() throws Exception {
         try (Connection conn = DriverManager.getConnection(getUrl())) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 993ff4b..a41e729 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -500,9 +500,25 @@ public class IndexRegionObserver extends BaseRegionObserver {
       return mutations;
   }
 
+  public void removeEmptyColumn(Mutation m, byte[] emptyCF, byte[] emptyCQ) {
+      List<Cell> cellList = m.getFamilyCellMap().get(emptyCF);
+      if (cellList == null) {
+          return;
+      }
+      Iterator<Cell> cellIterator = cellList.iterator();
+      while (cellIterator.hasNext()) {
+          Cell cell = cellIterator.next();
+          if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+                  emptyCQ, 0, emptyCQ.length) == 0) {
+              cellIterator.remove();
+              return;
+          }
+      }
+  }
+
   private void prepareIndexMutations(ObserverContext<RegionCoprocessorEnvironment> c,
                                      MiniBatchOperationInProgress<Mutation> miniBatchOp, BatchMutateContext context,
-                                     Collection<? extends Mutation> mutations) throws Throwable {
+                                     Collection<? extends Mutation> mutations, long now) throws Throwable {
       IndexMetaData indexMetaData = this.builder.getIndexMetaData(miniBatchOp);
       if (!(indexMetaData instanceof PhoenixIndexMetaData)) {
           throw new DoNotRetryIOException(
@@ -511,7 +527,6 @@ public class IndexRegionObserver extends BaseRegionObserver {
       }
       List<IndexMaintainer> maintainers = ((PhoenixIndexMetaData)indexMetaData).getIndexMaintainers();
 
-      List<Pair<Mutation, byte[]>> indexUpdatesForDeletes;
       // get the current span, or just use a null-span to avoid a bunch of if statements
       try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
           Span current = scope.getSpan();
@@ -528,7 +543,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
           byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName();
           Iterator<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdatesItr = indexUpdates.iterator();
           List<Mutation> localUpdates = new ArrayList<Mutation>(indexUpdates.size());
-          indexUpdatesForDeletes = new ArrayList<>(indexUpdates.size());
+          context.preIndexUpdates = new ArrayList<>(indexUpdates.size());
           context.intermediatePostIndexUpdates = new ArrayList<>(indexUpdates.size());
           while(indexUpdatesItr.hasNext()) {
               Pair<Pair<Mutation, byte[]>, byte[]> next = indexUpdatesItr.next();
@@ -549,35 +564,30 @@ public class IndexRegionObserver extends BaseRegionObserver {
                   // add the VERIFIED cell, which is the empty cell
                   Mutation m = next.getFirst().getFirst();
                   boolean rebuild = PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap());
-                  long ts = getMaxTimestamp(m);
                   if (rebuild) {
                       if (m instanceof Put) {
+                          long ts = getMaxTimestamp(m);
+                          // Remove the empty column prepared by Index codec as we need to change its value
+                          removeEmptyColumn(m, emptyCF, emptyCQ);
                           ((Put)m).addColumn(emptyCF, emptyCQ, ts, VERIFIED_BYTES);
                       }
                   } else {
-                      if (m instanceof Put) {
-                          ((Put)m).addColumn(emptyCF, emptyCQ, ts, UNVERIFIED_BYTES);
-                          // Ignore post index updates (i.e., the third write phase updates) for this row if it is
-                          // going through concurrent updates
-                          ImmutableBytesPtr rowKey = new ImmutableBytesPtr(next.getSecond());
-                          if (!context.pendingRows.contains(rowKey)) {
-                              Put put = new Put(m.getRow());
-                              put.addColumn(emptyCF, emptyCQ, ts, VERIFIED_BYTES);
-                              context.intermediatePostIndexUpdates.add(new Pair<>(new Pair<Mutation, byte[]>(put, next.getFirst().getSecond()), next.getSecond()));
-                          }
-                      } else {
-                          // For a delete mutation, first unverify the existing row in the index table and then delete
-                          // the row from the index table after deleting the corresponding row from the data table
-                          indexUpdatesItr.remove();
-                          Put put = new Put(m.getRow());
-                          put.addColumn(emptyCF, emptyCQ, ts, UNVERIFIED_BYTES);
-                          indexUpdatesForDeletes.add(new Pair<Mutation, byte[]>(put, next.getFirst().getSecond()));
-                          // Ignore post index updates (i.e., the third write phase updates) for this row if it is
-                          // going through concurrent updates
-                          ImmutableBytesPtr rowKey = new ImmutableBytesPtr(next.getSecond());
-                          if (!context.pendingRows.contains(rowKey)) {
-                              context.intermediatePostIndexUpdates.add(next);
+                      indexUpdatesItr.remove();
+                      // For this mutation whether it is put or delete, set the status of the index row "unverified"
+                      // This will be done before the data table row is updated (i.e., in the first write phase)
+                      Put unverifiedPut = new Put(m.getRow());
+                      unverifiedPut.addColumn(emptyCF, emptyCQ, now, UNVERIFIED_BYTES);
+                      context.preIndexUpdates.add(new Pair <Mutation, byte[]>(unverifiedPut, next.getFirst().getSecond()));
+                      // Ignore post index updates (i.e., the third write phase updates) for this row if it is
+                      // going through concurrent updates
+                      ImmutableBytesPtr rowKey = new ImmutableBytesPtr(next.getSecond());
+                      if (!context.pendingRows.contains(rowKey)) {
+                          if (m instanceof Put) {
+                              // Remove the empty column prepared by Index codec as we need to change its value
+                              removeEmptyColumn(m, emptyCF, emptyCQ);
+                              ((Put) m).addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES);
                           }
+                          context.intermediatePostIndexUpdates.add(next);
                       }
                   }
               }
@@ -586,10 +596,6 @@ public class IndexRegionObserver extends BaseRegionObserver {
               miniBatchOp.addOperationsFromCP(0,
                       localUpdates.toArray(new Mutation[localUpdates.size()]));
           }
-          if (!indexUpdatesForDeletes.isEmpty()) {
-              context.preIndexUpdates = indexUpdatesForDeletes;
-          }
-
           if (!indexUpdates.isEmpty() && context.preIndexUpdates.isEmpty()) {
               context.preIndexUpdates = new ArrayList<>(indexUpdates.size());
           }
@@ -628,7 +634,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
       }
 
       long start = EnvironmentEdgeManager.currentTimeMillis();
-      prepareIndexMutations(c, miniBatchOp, context, mutations);
+      prepareIndexMutations(c, miniBatchOp, context, mutations, now);
       metricSource.updateIndexPrepareTime(EnvironmentEdgeManager.currentTimeMillis() - start);
 
       // Sleep for one millisecond if we have prepared the index updates in less than 1 ms. The sleep is necessary to