You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2019/11/15 20:26:33 UTC

[phoenix] branch 4.x-HBase-1.3 updated: PHOENIX-5564 Restructure read repair to improve readability and correctness

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.x-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-HBase-1.3 by this push:
     new 730cdab  PHOENIX-5564 Restructure read repair to improve readability and correctness
730cdab is described below

commit 730cdab3e14cce555954d9e83f177dec3ced81d3
Author: Kadir <ko...@salesforce.com>
AuthorDate: Sat Nov 9 17:05:18 2019 -0800

    PHOENIX-5564 Restructure read repair to improve readability and correctness
---
 .../UngroupedAggregateRegionObserver.java          |  26 ++--
 .../apache/phoenix/index/GlobalIndexChecker.java   | 141 ++++++++++++++-------
 2 files changed, 113 insertions(+), 54 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 8477625..6bcbff7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -97,6 +97,7 @@ import org.apache.phoenix.hbase.index.exception.IndexWriteException;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.index.GlobalIndexChecker;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
@@ -1140,8 +1141,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                 if (!includedColumns.contains(column)) {
                     if (del == null) {
                         Cell cell = row.get(0);
-                        rowKey = new byte[cell.getRowLength()];
-                        System.arraycopy(cell.getRowArray(), cell.getRowOffset(), rowKey, 0, cell.getRowLength());
+                        rowKey = CellUtil.cloneRow(cell);
                         del = new Delete(rowKey);
                     }
                     del.addColumns(column.getFamily(), column.getQualifier(), ts);
@@ -1219,15 +1219,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                                     del.addDeleteMarker(cell);
                                 }
                             }
-                            if (indexRowKey != null) {
-                                // GlobalIndexChecker passed the index row key. This is to build a single index row.
-                                // Check if the data table row we have just scanned matches with the index row key.
-                                // If not, there is no need to build the index row from this data table row,
-                                // and just return zero row count.
-                                if (!checkIndexRow(indexRowKey, put)) {
-                                    break;
-                                }
-                            }
                             uuidValue = commitIfReady(uuidValue);
                             if (!scan.isRaw()) {
                                 Delete deleteMarkers = generateDeleteMarkers(row);
@@ -1237,6 +1228,19 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                                     uuidValue = commitIfReady(uuidValue);
                                 }
                             }
+                            if (indexRowKey != null) {
+                                // GlobalIndexChecker passed the index row key. This is to build a single index row.
+                                // Check if the data table row we have just scanned matches with the index row key.
+                                // If not, there is no need to build the index row from this data table row,
+                                // and just return zero row count.
+                                if (checkIndexRow(indexRowKey, put)) {
+                                    rowCount = GlobalIndexChecker.RebuildReturnCode.INDEX_ROW_EXISTS.getValue();
+                                }
+                                else {
+                                    rowCount = GlobalIndexChecker.RebuildReturnCode.NO_INDEX_ROW.getValue();
+                                }
+                                break;
+                            }
                             rowCount++;
                         }
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index 9cd78b3..9ecf876 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -70,11 +70,43 @@ import org.apache.phoenix.util.ServerUtil;
  * 
  * Coprocessor that verifies the scanned rows of a non-transactional global index.
  *
+ * If an index row is unverified (i.e., the row status is unverified), the following steps are taken :
+ * (1) We generate the data row key from the index row key, and check if the data row exists. If not, this unverified
+ * index row is skipped (i.e., not returned to the client), and it is deleted if it is old enough. The age check is
+ * necessary in order not to delete the index rows that are currently being updated. If the data row exists,
+ * we continue with the rest of the steps.
+ * (2) The index row is rebuilt from the data row.
+ * (3) The current scanner is closed as the newly rebuilt row will not be visible to the current scanner.
+ * (4) if the data row does not point back to the unverified index row (i.e., the index row key generated from the data
+ * row does not match with the row key of the unverified index row), this unverified row is skipped and and it is
+ * deleted if it is old enough. A new scanner is opened starting form the index row after this unverified index row.
+ * (5) if the data points back to the unverified index row then, a new scanner is opened starting form the index row.
+ * The next row is scanned to check if it is verified. if it is verified, it is returned to the client. If not, then
+ * it means the data table row timestamp is lower than than the timestamp of the unverified index row, and
+ * the index row that has been rebuilt from the data table row is masked by this unverified row. This happens if the
+ * first phase updates (i.e., unverified index row updates) complete but the second phase updates (i.e., data table
+ * row updates) fail. There could be back to back such events so we need to scan older versions to retrieve
+ * the verified version that is masked by the unverified version(s).
+ *
  */
 public class GlobalIndexChecker extends BaseRegionObserver {
     private static final Log LOG = LogFactory.getLog(GlobalIndexChecker.class);
     private HTableFactory hTableFactory;
     private GlobalIndexCheckerSource metricsSource;
+    public enum RebuildReturnCode {
+        NO_DATA_ROW(0),
+        NO_INDEX_ROW(1),
+        INDEX_ROW_EXISTS(2);
+        private int value;
+
+        RebuildReturnCode(int value) {
+            this.value = value;
+        }
+
+        public int getValue() {
+            return value;
+        }
+    }
 
     /**
      * Class that verifies a given row of a non-transactional global index.
@@ -264,8 +296,8 @@ public class GlobalIndexChecker extends BaseRegionObserver {
             buildIndexScan.setStartRow(dataRowKey);
             buildIndexScan.setStopRow(dataRowKey);
             buildIndexScan.setTimeRange(0, maxTimestamp);
-            // Pass the index row key to the partial index builder which will build the index row only when the data
-            // table row for dataRowKey matches with this unverified index row.
+            // Pass the index row key to the partial index builder which will rebuild the index row and check if the
+            // row key of this rebuilt index row matches with the passed index row key
             buildIndexScan.setAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY, indexRowKey);
             Result result = null;
             try (ResultScanner resultScanner = dataHTable.getScanner(buildIndexScan)){
@@ -275,9 +307,10 @@ public class GlobalIndexChecker extends BaseRegionObserver {
             }
             // A single cell will be returned. We decode that here
             byte[] value = result.value();
-            long rowCount = PLong.INSTANCE.getCodec().decodeLong(new ImmutableBytesWritable(value), SortOrder.getDefault());
-            if (rowCount == 0) {
-                // This means there does not exist a data table row for this unverified index row
+            long code = PLong.INSTANCE.getCodec().decodeLong(new ImmutableBytesWritable(value), SortOrder.getDefault());
+            if (code == RebuildReturnCode.NO_DATA_ROW.getValue()) {
+                // This means there does not exist a data table row for the data row key derived from
+                // this unverified index row. So, no index row has been built
                 // Delete the unverified row from index if it is old enough
                 deleteRowIfAgedEnough(indexRowKey, row, ts, false);
                 // Skip this unverified row (i.e., do not return it to the client). Just retuning empty row is
@@ -285,35 +318,62 @@ public class GlobalIndexChecker extends BaseRegionObserver {
                 row.clear();
                 return;
             }
-            // Close the current scanner as the newly build row will not be visible to it
+            // An index row has been built. Close the current scanner as the newly built row will not be visible to it
             scanner.close();
+            if (code == RebuildReturnCode.NO_INDEX_ROW.getValue()) {
+                // This means there exists a data table row for the data row key derived from this unverified index row
+                // but the data table row does not point back to the index row.
+                // Delete the unverified row from index if it is old enough
+                deleteRowIfAgedEnough(indexRowKey, row, ts, false);
+                // Open a new scanner starting from the row after the current row
+                indexScan.setStartRow(indexRowKey);
+                scanner = region.getScanner(indexScan);
+                // Skip this unverified row (i.e., do not return it to the client). Just retuning empty row is
+                // sufficient to do that
+                row.clear();
+                return;
+            }
+            // code == RebuildReturnCode.INDEX_ROW_EXISTS.getValue()
             // Open a new scanner starting from the current row
             indexScan.setStartRow(indexRowKey);
             scanner = region.getScanner(indexScan);
-            // Scan the newly build index row
             scanner.next(row);
             if (row.isEmpty()) {
+                // This means the index row has been deleted before opening the new scanner.
                 return;
             }
-            boolean indexRowExists = false;
             // Check if the index row still exist after rebuild
-            while (Bytes.compareTo(row.get(0).getRowArray(), row.get(0).getRowOffset(), row.get(0).getRowLength(),
-                    indexRowKey, 0, indexRowKey.length) == 0) {
-                indexRowExists = true;
+            if  (Bytes.compareTo(row.get(0).getRowArray(), row.get(0).getRowOffset(), row.get(0).getRowLength(),
+                    indexRowKey, 0, indexRowKey.length) != 0) {
+                // This means the index row has been deleted before opening the new scanner. We got a different row
+                // If this row is "verified" (or empty) then we are good to go.
                 if (verifyRowAndRemoveEmptyColumn(row)) {
-                    // The index row status is "verified". This row is good to return to the client. We are done here.
                     return;
                 }
-                // The index row is still "unverified" after rebuild. This means either that the data table row timestamp is
-                // lower than than the timestamp of the unverified index row (ts) and the index row that is built from
-                // the data table row is masked by this unverified row, or that the corresponding data table row does
-                // exist
+                // The row is "unverified". Rewind the scanner and let the row be scanned again
+                // so that it can be repaired
+                scanner.close();
+                scanner = region.getScanner(indexScan);
+                row.clear();
+                return;
+            }
+            // The index row still exist after rebuild
+            // Check if the index row is still unverified
+            if (verifyRowAndRemoveEmptyColumn(row)) {
+                // The index row status is "verified". This row is good to return to the client. We are done here.
+                return;
+            }
+            // The index row is still "unverified" after rebuild. This means that the data table row timestamp is
+            // lower than than the timestamp of the unverified index row (ts) and the index row that is built from
+            // the data table row is masked by this unverified row. This happens if the first phase updates (i.e.,
+            // unverified index row updates) complete but the second phase updates (i.e., data table updates) fail.
+            // There could be back to back such events so we need a loop to go through them
+            do {
                 // First delete the unverified row from index if it is old enough
                 deleteRowIfAgedEnough(indexRowKey, row, ts, true);
-                // Now we will do a single row scan to retrieve the verified index row build from the data table row
-                // if such an index row exists. Note we cannot read all versions in one scan as the max number of row
-                // versions for an index table can be 1. In that case, we will get only one (i.e., the most recent
-                // version instead of all versions
+                // Now we will do a single row scan to retrieve the verified index row built from the data table row.
+                // Note we cannot read all versions in one scan as the max number of row versions for an index table
+                // can be 1. In that case, we will get only one (i.e., the most recent) version instead of all versions
                 singleRowIndexScan.setStartRow(indexRowKey);
                 singleRowIndexScan.setStopRow(indexRowKey);
                 singleRowIndexScan.setTimeRange(minTimestamp, ts);
@@ -322,28 +382,26 @@ public class GlobalIndexChecker extends BaseRegionObserver {
                 singleRowScanner.next(row);
                 singleRowScanner.close();
                 if (row.isEmpty()) {
-                    // This means that the data row did not exist, so we need to skip this unverified row (i.e., do not
-                    // return it to the client). Just retuning empty row is sufficient to do that
+                    LOG.error("Could not find the newly rebuilt index row with row key " +
+                            Bytes.toStringBinary(indexRowKey) + " for table " +
+                            region.getRegionInfo().getTable().getNameAsString());
+                    // This was not expected. The new build index row must be deleted before opening the new scanner
+                    // possibly by compaction
                     return;
                 }
-                ts = getMaxTimestamp(row);
-            }
-            if (indexRowExists) {
-                // This means there does not exist a data row for the unverified index row. Skip this row. To do that
-                // just return empty row.
-                row.clear();
-                return;
-            } else {
-                // This means the index row has been deleted. We got the next row
-                // If the next row is "verified" (or empty) then we are good to go.
                 if (verifyRowAndRemoveEmptyColumn(row)) {
+                    // The index row status is "verified". This row is good to return to the client. We are done here.
                     return;
                 }
-                // The next row is "unverified". Rewind the scanner and let the row be scanned again
-                // so that it can be repaired
-                scanner = region.getScanner(indexScan);
-                row.clear();
-            }
+                ts = getMaxTimestamp(row);
+            } while (Bytes.compareTo(row.get(0).getRowArray(), row.get(0).getRowOffset(), row.get(0).getRowLength(),
+                    indexRowKey, 0, indexRowKey.length) == 0);
+            // This should not happen at all
+            Cell cell = row.get(0);
+            byte[] rowKey = CellUtil.cloneRow(cell);
+            throw new DoNotRetryIOException("The scan returned a row with row key (" + Bytes.toStringBinary(rowKey) +
+                     ") different than indexRowKey (" + Bytes.toStringBinary(indexRowKey) + ") for table " +
+                        region.getRegionInfo().getTable().getNameAsString());
         }
 
         private boolean isEmptyColumn(Cell cell) {
@@ -360,8 +418,7 @@ public class GlobalIndexChecker extends BaseRegionObserver {
             get.addColumn(emptyCF, emptyCQ);
             Result result = region.get(get);
             if (result.isEmpty()) {
-                LOG.warn("The empty column does not exist in a row in " + region.getRegionInfo().getTable().getNameAsString());
-                return false;
+                throw new DoNotRetryIOException("The empty column does not exist in a row in " + region.getRegionInfo().getTable().getNameAsString());
             }
             if (Bytes.compareTo(result.getValue(emptyCF, emptyCQ), 0, VERIFIED_BYTES.length,
                     VERIFIED_BYTES, 0, VERIFIED_BYTES.length) != 0) {
@@ -392,8 +449,7 @@ public class GlobalIndexChecker extends BaseRegionObserver {
                     return true;
                 }
             }
-            byte[] rowKey = new byte[cell.getRowLength()];
-            System.arraycopy(cell.getRowArray(), cell.getRowOffset(), rowKey, 0, cell.getRowLength());
+            byte[] rowKey = CellUtil.cloneRow(cell);
             return verifyRow(rowKey);
         }
 
@@ -423,8 +479,7 @@ public class GlobalIndexChecker extends BaseRegionObserver {
             } else {
                 long repairStart = EnvironmentEdgeManager.currentTimeMillis();
 
-                byte[] rowKey = new byte[cell.getRowLength()];
-                System.arraycopy(cell.getRowArray(), cell.getRowOffset(), rowKey, 0, cell.getRowLength());
+                byte[] rowKey = CellUtil.cloneRow(cell);
                 long ts = getMaxTimestamp(cellList);
                 cellList.clear();