You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2019/10/06 06:11:09 UTC

[phoenix] branch 4.14-HBase-1.4 updated (097b9f2 -> 958270a)

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a change to branch 4.14-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git.


 discard 097b9f2  PHOENIX-5505 Index read repair does not repair unverified rows with higher timestamp (addendum)
     new 958270a  PHOENIX-5505 Index read repair does not repair unverified rows with higher timestamp (addendum)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (097b9f2)
            \
             N -- N -- N   refs/heads/4.14-HBase-1.4 (958270a)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexIT.java | 1 -
 1 file changed, 1 deletion(-)


[phoenix] 01/01: PHOENIX-5505 Index read repair does not repair unverified rows with higher timestamp (addendum)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.14-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit 958270a9c96255fbd8e54df0ac2f10c29f8405e3
Author: Kadir <ko...@salesforce.com>
AuthorDate: Sat Oct 5 16:30:07 2019 -0700

    PHOENIX-5505 Index read repair does not repair unverified rows with higher timestamp (addendum)
---
 .../end2end/index/GlobalMutableNonTxIndexIT.java   |  1 -
 .../org/apache/phoenix/util/IndexScrutinyIT.java   | 12 +---
 .../phoenix/hbase/index/IndexRegionObserver.java   | 17 +++++-
 .../apache/phoenix/index/GlobalIndexChecker.java   | 70 ++++++++++++++++++++--
 4 files changed, 82 insertions(+), 18 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexIT.java
index 6704b5b..d507088 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalMutableNonTxIndexIT.java
@@ -27,7 +27,6 @@ public class GlobalMutableNonTxIndexIT extends BaseIndexIT {
 
     public GlobalMutableNonTxIndexIT(boolean localIndex, boolean mutable, boolean transactional, boolean columnEncoded, boolean skipPostIndexUpdates) {
         super(localIndex, mutable, transactional, columnEncoded);
-        IndexRegionObserver.setSkipPostIndexUpdatesForTesting(skipPostIndexUpdates);
     }
 
     @Parameters(name="GlobalMutableNonTxIndexIT_localIndex={0},mutable={1},transactional={2},columnEncoded={3},skipPostIndexUpdates={4}") // name is used by failsafe as file name in reports
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
index 20ec965..8af61c4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
@@ -91,17 +91,11 @@ public class IndexScrutinyIT extends ParallelStatsDisabledIT {
             conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','1')");
             conn.commit();
 
-            // Writing index directly will generate unverified rows with no corresponding data rows. These rows will not be visible to the applications
+            // Writing index directly will generate unverified rows. These rows will recovered if there exists the
+            // corresponding data row
             conn.createStatement().executeUpdate("UPSERT INTO " + fullIndexName + " VALUES ('ccc','a','2')");
             conn.commit();
-            try {
-                IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
-                fail();
-            } catch (AssertionError e) {
-                assertEquals("Expected data table row count to match expected:<2> but was:<1>", e.getMessage());
-            }
+            assertEquals(2, IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName));
         }
     }
-
-
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 7c37c7a..1820194 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -159,6 +159,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
       // batch with a timestamp earlier than the timestamp of this batch and the earlier batch has a mutation on the
       // row (i.e., concurrent updates).
       private HashSet<ImmutableBytesPtr> pendingRows = new HashSet<>();
+      private HashSet<ImmutableBytesPtr> rowsToLock = new HashSet<>();
 
       private BatchMutateContext(int clientVersion) {
           this.clientVersion = clientVersion;
@@ -374,18 +375,27 @@ public class IndexRegionObserver extends BaseRegionObserver {
       }
   }
 
-  private void lockRows(MiniBatchOperationInProgress<Mutation> miniBatchOp, BatchMutateContext context) throws IOException {
+  private void populateRowsToLock(MiniBatchOperationInProgress<Mutation> miniBatchOp, BatchMutateContext context) {
       for (int i = 0; i < miniBatchOp.size(); i++) {
           if (miniBatchOp.getOperationStatus(i) == IGNORE) {
               continue;
           }
           Mutation m = miniBatchOp.getOperation(i);
           if (this.builder.isEnabled(m)) {
-              context.rowLocks.add(lockManager.lockRow(m.getRow(), rowLockWaitDuration));
+              ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+              if (!context.rowsToLock.contains(row)) {
+                  context.rowsToLock.add(row);
+              }
           }
       }
   }
 
+  private void lockRows(BatchMutateContext context) throws IOException {
+      for (ImmutableBytesPtr rowKey : context.rowsToLock) {
+          context.rowLocks.add(lockManager.lockRow(rowKey, rowLockWaitDuration));
+      }
+  }
+
   private void populatePendingRows(BatchMutateContext context, long now) {
       for (RowLock rowLock : context.rowLocks) {
           ImmutableBytesPtr rowKey = rowLock.getRowKey();
@@ -598,7 +608,8 @@ public class IndexRegionObserver extends BaseRegionObserver {
        * while determining the index updates
        */
       if (replayWrite == null) {
-          lockRows(miniBatchOp, context);
+          populateRowsToLock(miniBatchOp, context);
+          lockRows(context);
       }
       long now = EnvironmentEdgeManager.currentTimeMillis();
       // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index 6b3b368..304d73e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -53,6 +53,8 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.metrics.GlobalIndexCheckerSource;
 import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
 import org.apache.phoenix.hbase.index.table.HTableFactory;
@@ -84,6 +86,7 @@ public class GlobalIndexChecker extends BaseRegionObserver {
         private Scan scan;
         private Scan indexScan;
         private Scan singleRowIndexScan;
+        private Scan singleRowDataScan;
         private Scan buildIndexScan = null;
         private Table dataHTable = null;
         private byte[] emptyCF;
@@ -207,22 +210,71 @@ public class GlobalIndexChecker extends BaseRegionObserver {
             }
         }
 
-        private void deleteRowIfAgedEnough(byte[] indexRowKey, List<Cell> row, long ts) throws IOException {
+        private void deleteRowIfAgedEnough(byte[] indexRowKey, List<Cell> row, long ts, boolean specific) throws IOException {
             if ((EnvironmentEdgeManager.currentTimeMillis() - ts) > ageThreshold) {
                 Delete del = new Delete(indexRowKey, ts);
-                // We are deleting a specific version of a row so the flowing loop is for that
-                for (Cell cell : row) {
-                    del.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), cell.getTimestamp());
+                if (specific) {
+                    // We are deleting a specific version of a row so the flowing loop is for that
+                    for (Cell cell : row) {
+                        del.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), cell.getTimestamp());
+                    }
                 }
                 Mutation[] mutations = new Mutation[]{del};
                 region.batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
             }
         }
 
+        private boolean doesDataRowExist(byte[] indexRowKey, byte[] dataRowKey) throws IOException {
+            singleRowDataScan.withStartRow(dataRowKey, true);
+            singleRowDataScan.withStopRow(dataRowKey, true);
+            singleRowDataScan.setTimeRange(0, maxTimestamp);
+            try (ResultScanner resultScanner = dataHTable.getScanner(singleRowDataScan)) {
+                final Result result = resultScanner.next();
+                if (result == null) {
+                    // There is no data table row for this index unverified index row. We need to skip it.
+                    return false;
+                }
+                else {
+                    ValueGetter getter = new ValueGetter() {
+                        final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
+
+                        @Override
+                        public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
+                            Cell cell = result.getColumnLatestCell(ref.getFamily(), ref.getQualifier());
+                            if (cell == null) {
+                                return null;
+                            }
+                            valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+                            return valuePtr;
+                        }
+
+                        @Override
+                        public byte[] getRowKey() {
+                            return result.getRow();
+                        }
+                    };
+                    for (Cell cell : result.rawCells()) {
+                        String cellString = cell.toString();
+                        LOG.debug("Rebuilt row :" + cellString + " value : " + Bytes.toStringBinary(CellUtil.cloneValue(cell)));
+                    }
+                    byte[] builtIndexRowKey = indexMaintainer.buildRowKey(getter, new ImmutableBytesWritable(dataRowKey), null, null, maxTimestamp);
+                    if (Bytes.compareTo(builtIndexRowKey, 0, builtIndexRowKey.length,
+                            indexRowKey, 0, indexRowKey.length) != 0) {
+                        // The row key of the index row that has been built is different than the row key of the unverified
+                        // index row
+                        return false;
+                    }
+                }
+            } catch (Throwable t) {
+                ServerUtil.throwIOException(dataHTable.getName().toString(), t);
+            }
+            return true;
+        }
         private void repairIndexRows(byte[] indexRowKey, long ts, List<Cell> row) throws IOException {
             // Build the data table row key from the index table row key
             if (buildIndexScan == null) {
                 buildIndexScan = new Scan();
+                singleRowDataScan = new Scan();
                 indexScan = new Scan(scan);
                 singleRowIndexScan = new Scan(scan);
                 byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME);
@@ -262,6 +314,14 @@ public class GlobalIndexChecker extends BaseRegionObserver {
             } catch (Throwable t) {
                 ServerUtil.throwIOException(dataHTable.getName().toString(), t);
             }
+            if (!doesDataRowExist(indexRowKey, dataRowKey)) {
+                // Delete the unverified row from index if it is old enough
+                deleteRowIfAgedEnough(indexRowKey, row, ts, false);
+                // Skip this unverified row (i.e., do not return it to the client). Just retuning empty row is
+                // sufficient to do that
+                row.clear();
+                return;
+            }
             // Close the current scanner as the newly build row will not be visible to it
             scanner.close();
             // Open a new scanner starting from the current row
@@ -286,7 +346,7 @@ public class GlobalIndexChecker extends BaseRegionObserver {
                 // the data table row is masked by this unverified row, or that the corresponding data table row does
                 // exist
                 // First delete the unverified row from index if it is old enough
-                deleteRowIfAgedEnough(indexRowKey, row, ts);
+                deleteRowIfAgedEnough(indexRowKey, row, ts, true);
                 // Now we will do a single row scan to retrieve the verified index row build from the data table row
                 // if such an index row exists. Note we cannot read all versions in one scan as the max number of row
                 // versions for an index table can be 1. In that case, we will get only one (i.e., the most recent