You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2019/11/15 22:30:33 UTC
[phoenix] branch 4.x-HBase-1.5 updated: PHOENIX-5564 Restructure
read repair to improve readability and correctness
This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch 4.x-HBase-1.5
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.5 by this push:
new cf61722 PHOENIX-5564 Restructure read repair to improve readability and correctness
cf61722 is described below
commit cf617225defa340771f843651d0575331d229adf
Author: Kadir <ko...@salesforce.com>
AuthorDate: Sat Nov 9 17:05:18 2019 -0800
PHOENIX-5564 Restructure read repair to improve readability and correctness
---
.../UngroupedAggregateRegionObserver.java | 26 ++--
.../apache/phoenix/index/GlobalIndexChecker.java | 141 ++++++++++++++-------
2 files changed, 113 insertions(+), 54 deletions(-)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index c2b53a6..23091a8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -97,6 +97,7 @@ import org.apache.phoenix.hbase.index.exception.IndexWriteException;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.index.GlobalIndexChecker;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
@@ -1146,8 +1147,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
if (!includedColumns.contains(column)) {
if (del == null) {
Cell cell = row.get(0);
- rowKey = new byte[cell.getRowLength()];
- System.arraycopy(cell.getRowArray(), cell.getRowOffset(), rowKey, 0, cell.getRowLength());
+ rowKey = CellUtil.cloneRow(cell);
del = new Delete(rowKey);
}
del.addColumns(column.getFamily(), column.getQualifier(), ts);
@@ -1225,15 +1225,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
del.addDeleteMarker(cell);
}
}
- if (indexRowKey != null) {
- // GlobalIndexChecker passed the index row key. This is to build a single index row.
- // Check if the data table row we have just scanned matches with the index row key.
- // If not, there is no need to build the index row from this data table row,
- // and just return zero row count.
- if (!checkIndexRow(indexRowKey, put)) {
- break;
- }
- }
uuidValue = commitIfReady(uuidValue);
if (!scan.isRaw()) {
Delete deleteMarkers = generateDeleteMarkers(row);
@@ -1243,6 +1234,19 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
uuidValue = commitIfReady(uuidValue);
}
}
+ if (indexRowKey != null) {
+ // GlobalIndexChecker passed the index row key. This is to build a single index row.
+ // Check if the data table row we have just scanned matches with the index row key.
+ // If not, there is no need to build the index row from this data table row,
+ // and just return zero row count.
+ if (checkIndexRow(indexRowKey, put)) {
+ rowCount = GlobalIndexChecker.RebuildReturnCode.INDEX_ROW_EXISTS.getValue();
+ }
+ else {
+ rowCount = GlobalIndexChecker.RebuildReturnCode.NO_INDEX_ROW.getValue();
+ }
+ break;
+ }
rowCount++;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index 1a737ba..48794c1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -70,11 +70,43 @@ import org.apache.phoenix.util.ServerUtil;
*
* Coprocessor that verifies the scanned rows of a non-transactional global index.
*
+ * If an index row is unverified (i.e., the row status is unverified), the following steps are taken :
+ * (1) We generate the data row key from the index row key, and check if the data row exists. If not, this unverified
+ * index row is skipped (i.e., not returned to the client), and it is deleted if it is old enough. The age check is
+ * necessary in order not to delete the index rows that are currently being updated. If the data row exists,
+ * we continue with the rest of the steps.
+ * (2) The index row is rebuilt from the data row.
+ * (3) The current scanner is closed as the newly rebuilt row will not be visible to the current scanner.
+ * (4) if the data row does not point back to the unverified index row (i.e., the index row key generated from the data
+ * row does not match with the row key of the unverified index row), this unverified row is skipped and and it is
+ * deleted if it is old enough. A new scanner is opened starting form the index row after this unverified index row.
+ * (5) if the data points back to the unverified index row then, a new scanner is opened starting form the index row.
+ * The next row is scanned to check if it is verified. if it is verified, it is returned to the client. If not, then
+ * it means the data table row timestamp is lower than than the timestamp of the unverified index row, and
+ * the index row that has been rebuilt from the data table row is masked by this unverified row. This happens if the
+ * first phase updates (i.e., unverified index row updates) complete but the second phase updates (i.e., data table
+ * row updates) fail. There could be back to back such events so we need to scan older versions to retrieve
+ * the verified version that is masked by the unverified version(s).
+ *
*/
public class GlobalIndexChecker extends BaseRegionObserver {
private static final Log LOG = LogFactory.getLog(GlobalIndexChecker.class);
private HTableFactory hTableFactory;
private GlobalIndexCheckerSource metricsSource;
+ public enum RebuildReturnCode {
+ NO_DATA_ROW(0),
+ NO_INDEX_ROW(1),
+ INDEX_ROW_EXISTS(2);
+ private int value;
+
+ RebuildReturnCode(int value) {
+ this.value = value;
+ }
+
+ public int getValue() {
+ return value;
+ }
+ }
/**
* Class that verifies a given row of a non-transactional global index.
@@ -264,8 +296,8 @@ public class GlobalIndexChecker extends BaseRegionObserver {
buildIndexScan.withStartRow(dataRowKey, true);
buildIndexScan.withStopRow(dataRowKey, true);
buildIndexScan.setTimeRange(0, maxTimestamp);
- // Pass the index row key to the partial index builder which will build the index row only when the data
- // table row for dataRowKey matches with this unverified index row.
+ // Pass the index row key to the partial index builder which will rebuild the index row and check if the
+ // row key of this rebuilt index row matches with the passed index row key
buildIndexScan.setAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY, indexRowKey);
Result result = null;
try (ResultScanner resultScanner = dataHTable.getScanner(buildIndexScan)){
@@ -275,9 +307,10 @@ public class GlobalIndexChecker extends BaseRegionObserver {
}
// A single cell will be returned. We decode that here
byte[] value = result.value();
- long rowCount = PLong.INSTANCE.getCodec().decodeLong(new ImmutableBytesWritable(value), SortOrder.getDefault());
- if (rowCount == 0) {
- // This means there does not exist a data table row for this unverified index row
+ long code = PLong.INSTANCE.getCodec().decodeLong(new ImmutableBytesWritable(value), SortOrder.getDefault());
+ if (code == RebuildReturnCode.NO_DATA_ROW.getValue()) {
+ // This means there does not exist a data table row for the data row key derived from
+ // this unverified index row. So, no index row has been built
// Delete the unverified row from index if it is old enough
deleteRowIfAgedEnough(indexRowKey, row, ts, false);
// Skip this unverified row (i.e., do not return it to the client). Just retuning empty row is
@@ -285,35 +318,62 @@ public class GlobalIndexChecker extends BaseRegionObserver {
row.clear();
return;
}
- // Close the current scanner as the newly build row will not be visible to it
+ // An index row has been built. Close the current scanner as the newly built row will not be visible to it
scanner.close();
+ if (code == RebuildReturnCode.NO_INDEX_ROW.getValue()) {
+ // This means there exists a data table row for the data row key derived from this unverified index row
+ // but the data table row does not point back to the index row.
+ // Delete the unverified row from index if it is old enough
+ deleteRowIfAgedEnough(indexRowKey, row, ts, false);
+ // Open a new scanner starting from the row after the current row
+ indexScan.withStartRow(indexRowKey, false);
+ scanner = region.getScanner(indexScan);
+ // Skip this unverified row (i.e., do not return it to the client). Just retuning empty row is
+ // sufficient to do that
+ row.clear();
+ return;
+ }
+ // code == RebuildReturnCode.INDEX_ROW_EXISTS.getValue()
// Open a new scanner starting from the current row
indexScan.withStartRow(indexRowKey, true);
scanner = region.getScanner(indexScan);
- // Scan the newly build index row
scanner.next(row);
if (row.isEmpty()) {
+ // This means the index row has been deleted before opening the new scanner.
return;
}
- boolean indexRowExists = false;
// Check if the index row still exist after rebuild
- while (Bytes.compareTo(row.get(0).getRowArray(), row.get(0).getRowOffset(), row.get(0).getRowLength(),
- indexRowKey, 0, indexRowKey.length) == 0) {
- indexRowExists = true;
+ if (Bytes.compareTo(row.get(0).getRowArray(), row.get(0).getRowOffset(), row.get(0).getRowLength(),
+ indexRowKey, 0, indexRowKey.length) != 0) {
+ // This means the index row has been deleted before opening the new scanner. We got a different row
+ // If this row is "verified" (or empty) then we are good to go.
if (verifyRowAndRemoveEmptyColumn(row)) {
- // The index row status is "verified". This row is good to return to the client. We are done here.
return;
}
- // The index row is still "unverified" after rebuild. This means either that the data table row timestamp is
- // lower than than the timestamp of the unverified index row (ts) and the index row that is built from
- // the data table row is masked by this unverified row, or that the corresponding data table row does
- // exist
+ // The row is "unverified". Rewind the scanner and let the row be scanned again
+ // so that it can be repaired
+ scanner.close();
+ scanner = region.getScanner(indexScan);
+ row.clear();
+ return;
+ }
+ // The index row still exist after rebuild
+ // Check if the index row is still unverified
+ if (verifyRowAndRemoveEmptyColumn(row)) {
+ // The index row status is "verified". This row is good to return to the client. We are done here.
+ return;
+ }
+ // The index row is still "unverified" after rebuild. This means that the data table row timestamp is
+ // lower than than the timestamp of the unverified index row (ts) and the index row that is built from
+ // the data table row is masked by this unverified row. This happens if the first phase updates (i.e.,
+ // unverified index row updates) complete but the second phase updates (i.e., data table updates) fail.
+ // There could be back to back such events so we need a loop to go through them
+ do {
// First delete the unverified row from index if it is old enough
deleteRowIfAgedEnough(indexRowKey, row, ts, true);
- // Now we will do a single row scan to retrieve the verified index row build from the data table row
- // if such an index row exists. Note we cannot read all versions in one scan as the max number of row
- // versions for an index table can be 1. In that case, we will get only one (i.e., the most recent
- // version instead of all versions
+ // Now we will do a single row scan to retrieve the verified index row built from the data table row.
+ // Note we cannot read all versions in one scan as the max number of row versions for an index table
+ // can be 1. In that case, we will get only one (i.e., the most recent) version instead of all versions
singleRowIndexScan.withStartRow(indexRowKey, true);
singleRowIndexScan.withStopRow(indexRowKey, true);
singleRowIndexScan.setTimeRange(minTimestamp, ts);
@@ -322,28 +382,26 @@ public class GlobalIndexChecker extends BaseRegionObserver {
singleRowScanner.next(row);
singleRowScanner.close();
if (row.isEmpty()) {
- // This means that the data row did not exist, so we need to skip this unverified row (i.e., do not
- // return it to the client). Just retuning empty row is sufficient to do that
+ LOG.error("Could not find the newly rebuilt index row with row key " +
+ Bytes.toStringBinary(indexRowKey) + " for table " +
+ region.getRegionInfo().getTable().getNameAsString());
+ // This was not expected. The new build index row must be deleted before opening the new scanner
+ // possibly by compaction
return;
}
- ts = getMaxTimestamp(row);
- }
- if (indexRowExists) {
- // This means there does not exist a data row for the unverified index row. Skip this row. To do that
- // just return empty row.
- row.clear();
- return;
- } else {
- // This means the index row has been deleted. We got the next row
- // If the next row is "verified" (or empty) then we are good to go.
if (verifyRowAndRemoveEmptyColumn(row)) {
+ // The index row status is "verified". This row is good to return to the client. We are done here.
return;
}
- // The next row is "unverified". Rewind the scanner and let the row be scanned again
- // so that it can be repaired
- scanner = region.getScanner(indexScan);
- row.clear();
- }
+ ts = getMaxTimestamp(row);
+ } while (Bytes.compareTo(row.get(0).getRowArray(), row.get(0).getRowOffset(), row.get(0).getRowLength(),
+ indexRowKey, 0, indexRowKey.length) == 0);
+ // This should not happen at all
+ Cell cell = row.get(0);
+ byte[] rowKey = CellUtil.cloneRow(cell);
+ throw new DoNotRetryIOException("The scan returned a row with row key (" + Bytes.toStringBinary(rowKey) +
+ ") different than indexRowKey (" + Bytes.toStringBinary(indexRowKey) + ") for table " +
+ region.getRegionInfo().getTable().getNameAsString());
}
private boolean isEmptyColumn(Cell cell) {
@@ -360,8 +418,7 @@ public class GlobalIndexChecker extends BaseRegionObserver {
get.addColumn(emptyCF, emptyCQ);
Result result = region.get(get);
if (result.isEmpty()) {
- LOG.warn("The empty column does not exist in a row in " + region.getRegionInfo().getTable().getNameAsString());
- return false;
+ throw new DoNotRetryIOException("The empty column does not exist in a row in " + region.getRegionInfo().getTable().getNameAsString());
}
if (Bytes.compareTo(result.getValue(emptyCF, emptyCQ), 0, VERIFIED_BYTES.length,
VERIFIED_BYTES, 0, VERIFIED_BYTES.length) != 0) {
@@ -392,8 +449,7 @@ public class GlobalIndexChecker extends BaseRegionObserver {
return true;
}
}
- byte[] rowKey = new byte[cell.getRowLength()];
- System.arraycopy(cell.getRowArray(), cell.getRowOffset(), rowKey, 0, cell.getRowLength());
+ byte[] rowKey = CellUtil.cloneRow(cell);
return verifyRow(rowKey);
}
@@ -423,8 +479,7 @@ public class GlobalIndexChecker extends BaseRegionObserver {
} else {
long repairStart = EnvironmentEdgeManager.currentTimeMillis();
- byte[] rowKey = new byte[cell.getRowLength()];
- System.arraycopy(cell.getRowArray(), cell.getRowOffset(), rowKey, 0, cell.getRowLength());
+ byte[] rowKey = CellUtil.cloneRow(cell);
long ts = getMaxTimestamp(cellList);
cellList.clear();