You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2019/10/06 00:42:28 UTC

[phoenix] branch 4.x-HBase-1.3 updated: PHOENIX-5505 Index read repair does not repair unverified rows with higher timestamp (addendum)

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.x-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-HBase-1.3 by this push:
     new e64ff2d  PHOENIX-5505 Index read repair does not repair unverified rows with higher timestamp (addendum)
e64ff2d is described below

commit e64ff2d01dbbae388d6bd9d9cd060af10c7b41ed
Author: Kadir <ko...@salesforce.com>
AuthorDate: Sat Oct 5 16:30:07 2019 -0700

    PHOENIX-5505 Index read repair does not repair unverified rows with higher timestamp (addendum)
---
 .../org/apache/phoenix/util/IndexScrutinyIT.java   | 12 +---
 .../phoenix/hbase/index/IndexRegionObserver.java   | 17 +++++-
 .../apache/phoenix/index/GlobalIndexChecker.java   | 70 ++++++++++++++++++++--
 3 files changed, 82 insertions(+), 17 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
index 20ec965..8af61c4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
@@ -91,17 +91,11 @@ public class IndexScrutinyIT extends ParallelStatsDisabledIT {
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','1')");
             conn.commit();
 
-            // Writing index directly will generate unverified rows with no corresponding data rows. These rows will not be visible to the applications
+            // Writing index directly will generate unverified rows. These rows will recovered if there exists the
+            // corresponding data row
             conn.createStatement().executeUpdate("UPSERT INTO " + fullIndexName + " VALUES ('ccc','a','2')");
             conn.commit();
-            try {
-                IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
-                fail();
-            } catch (AssertionError e) {
-                assertEquals("Expected data table row count to match expected:<2> but was:<1>", e.getMessage());
-            }
+            assertEquals(2, IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName));
         }
     }
-
-
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 7c37c7a..1820194 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -159,6 +159,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
       // batch with a timestamp earlier than the timestamp of this batch and the earlier batch has a mutation on the
       // row (i.e., concurrent updates).
       private HashSet<ImmutableBytesPtr> pendingRows = new HashSet<>();
+      private HashSet<ImmutableBytesPtr> rowsToLock = new HashSet<>();
 
       private BatchMutateContext(int clientVersion) {
           this.clientVersion = clientVersion;
@@ -374,18 +375,27 @@ public class IndexRegionObserver extends BaseRegionObserver {
       }
   }
 
-  private void lockRows(MiniBatchOperationInProgress<Mutation> miniBatchOp, BatchMutateContext context) throws IOException {
+  private void populateRowsToLock(MiniBatchOperationInProgress<Mutation> miniBatchOp, BatchMutateContext context) {
       for (int i = 0; i < miniBatchOp.size(); i++) {
           if (miniBatchOp.getOperationStatus(i) == IGNORE) {
               continue;
           }
           Mutation m = miniBatchOp.getOperation(i);
           if (this.builder.isEnabled(m)) {
-              context.rowLocks.add(lockManager.lockRow(m.getRow(), rowLockWaitDuration));
+              ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+              if (!context.rowsToLock.contains(row)) {
+                  context.rowsToLock.add(row);
+              }
           }
       }
   }
 
+  private void lockRows(BatchMutateContext context) throws IOException {
+      for (ImmutableBytesPtr rowKey : context.rowsToLock) {
+          context.rowLocks.add(lockManager.lockRow(rowKey, rowLockWaitDuration));
+      }
+  }
+
   private void populatePendingRows(BatchMutateContext context, long now) {
       for (RowLock rowLock : context.rowLocks) {
           ImmutableBytesPtr rowKey = rowLock.getRowKey();
@@ -598,7 +608,8 @@ public class IndexRegionObserver extends BaseRegionObserver {
        * while determining the index updates
        */
       if (replayWrite == null) {
-          lockRows(miniBatchOp, context);
+          populateRowsToLock(miniBatchOp, context);
+          lockRows(context);
       }
       long now = EnvironmentEdgeManager.currentTimeMillis();
       // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index 6b3b368..fac5cb2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -53,6 +53,8 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.metrics.GlobalIndexCheckerSource;
 import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
 import org.apache.phoenix.hbase.index.table.HTableFactory;
@@ -84,6 +86,7 @@ public class GlobalIndexChecker extends BaseRegionObserver {
         private Scan scan;
         private Scan indexScan;
         private Scan singleRowIndexScan;
+        private Scan singleRowDataScan;
         private Scan buildIndexScan = null;
         private Table dataHTable = null;
         private byte[] emptyCF;
@@ -207,22 +210,71 @@ public class GlobalIndexChecker extends BaseRegionObserver {
             }
         }
 
-        private void deleteRowIfAgedEnough(byte[] indexRowKey, List<Cell> row, long ts) throws IOException {
+        private void deleteRowIfAgedEnough(byte[] indexRowKey, List<Cell> row, long ts, boolean specific) throws IOException {
             if ((EnvironmentEdgeManager.currentTimeMillis() - ts) > ageThreshold) {
                 Delete del = new Delete(indexRowKey, ts);
-                // We are deleting a specific version of a row so the flowing loop is for that
-                for (Cell cell : row) {
-                    del.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), cell.getTimestamp());
+                if (specific) {
+                    // We are deleting a specific version of a row so the flowing loop is for that
+                    for (Cell cell : row) {
+                        del.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), cell.getTimestamp());
+                    }
                 }
                 Mutation[] mutations = new Mutation[]{del};
                 region.batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
             }
         }
 
+        private boolean doesDataRowExist(byte[] indexRowKey, byte[] dataRowKey) throws IOException {
+            singleRowDataScan.setStartRow(dataRowKey);
+            singleRowDataScan.setStopRow(dataRowKey);
+            singleRowDataScan.setTimeRange(0, maxTimestamp);
+            try (ResultScanner resultScanner = dataHTable.getScanner(singleRowDataScan)) {
+                final Result result = resultScanner.next();
+                if (result == null) {
+                    // There is no data table row for this index unverified index row. We need to skip it.
+                    return false;
+                }
+                else {
+                    ValueGetter getter = new ValueGetter() {
+                        final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
+
+                        @Override
+                        public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
+                            Cell cell = result.getColumnLatestCell(ref.getFamily(), ref.getQualifier());
+                            if (cell == null) {
+                                return null;
+                            }
+                            valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+                            return valuePtr;
+                        }
+
+                        @Override
+                        public byte[] getRowKey() {
+                            return result.getRow();
+                        }
+                    };
+                    for (Cell cell : result.rawCells()) {
+                        String cellString = cell.toString();
+                        LOG.debug("Rebuilt row :" + cellString + " value : " + Bytes.toStringBinary(CellUtil.cloneValue(cell)));
+                    }
+                    byte[] builtIndexRowKey = indexMaintainer.buildRowKey(getter, new ImmutableBytesWritable(dataRowKey), null, null, maxTimestamp);
+                    if (Bytes.compareTo(builtIndexRowKey, 0, builtIndexRowKey.length,
+                            indexRowKey, 0, indexRowKey.length) != 0) {
+                        // The row key of the index row that has been built is different than the row key of the unverified
+                        // index row
+                        return false;
+                    }
+                }
+            } catch (Throwable t) {
+                ServerUtil.throwIOException(dataHTable.getName().toString(), t);
+            }
+            return true;
+        }
         private void repairIndexRows(byte[] indexRowKey, long ts, List<Cell> row) throws IOException {
             // Build the data table row key from the index table row key
             if (buildIndexScan == null) {
                 buildIndexScan = new Scan();
+                singleRowDataScan = new Scan();
                 indexScan = new Scan(scan);
                 singleRowIndexScan = new Scan(scan);
                 byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME);
@@ -262,6 +314,14 @@ public class GlobalIndexChecker extends BaseRegionObserver {
             } catch (Throwable t) {
                 ServerUtil.throwIOException(dataHTable.getName().toString(), t);
             }
+            if (!doesDataRowExist(indexRowKey, dataRowKey)) {
+                // Delete the unverified row from index if it is old enough
+                deleteRowIfAgedEnough(indexRowKey, row, ts, false);
+                // Skip this unverified row (i.e., do not return it to the client). Just retuning empty row is
+                // sufficient to do that
+                row.clear();
+                return;
+            }
             // Close the current scanner as the newly build row will not be visible to it
             scanner.close();
             // Open a new scanner starting from the current row
@@ -286,7 +346,7 @@ public class GlobalIndexChecker extends BaseRegionObserver {
                 // the data table row is masked by this unverified row, or that the corresponding data table row does
                 // exist
                 // First delete the unverified row from index if it is old enough
-                deleteRowIfAgedEnough(indexRowKey, row, ts);
+                deleteRowIfAgedEnough(indexRowKey, row, ts, true);
                 // Now we will do a single row scan to retrieve the verified index row build from the data table row
                 // if such an index row exists. Note we cannot read all versions in one scan as the max number of row
                 // versions for an index table can be 1. In that case, we will get only one (i.e., the most recent