You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2019/10/06 00:42:28 UTC
[phoenix] branch 4.x-HBase-1.3 updated: PHOENIX-5505 Index read
repair does not repair unverified rows with higher timestamp (addendum)
This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch 4.x-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.3 by this push:
new e64ff2d PHOENIX-5505 Index read repair does not repair unverified rows with higher timestamp (addendum)
e64ff2d is described below
commit e64ff2d01dbbae388d6bd9d9cd060af10c7b41ed
Author: Kadir <ko...@salesforce.com>
AuthorDate: Sat Oct 5 16:30:07 2019 -0700
PHOENIX-5505 Index read repair does not repair unverified rows with higher timestamp (addendum)
---
.../org/apache/phoenix/util/IndexScrutinyIT.java | 12 +---
.../phoenix/hbase/index/IndexRegionObserver.java | 17 +++++-
.../apache/phoenix/index/GlobalIndexChecker.java | 70 ++++++++++++++++++++--
3 files changed, 82 insertions(+), 17 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
index 20ec965..8af61c4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/util/IndexScrutinyIT.java
@@ -91,17 +91,11 @@ public class IndexScrutinyIT extends ParallelStatsDisabledIT {
conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES('a','ccc','1')");
conn.commit();
- // Writing index directly will generate unverified rows with no corresponding data rows. These rows will not be visible to the applications
+ // Writing index directly will generate unverified rows. These rows will recovered if there exists the
+ // corresponding data row
conn.createStatement().executeUpdate("UPSERT INTO " + fullIndexName + " VALUES ('ccc','a','2')");
conn.commit();
- try {
- IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
- fail();
- } catch (AssertionError e) {
- assertEquals("Expected data table row count to match expected:<2> but was:<1>", e.getMessage());
- }
+ assertEquals(2, IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName));
}
}
-
-
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 7c37c7a..1820194 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -159,6 +159,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
// batch with a timestamp earlier than the timestamp of this batch and the earlier batch has a mutation on the
// row (i.e., concurrent updates).
private HashSet<ImmutableBytesPtr> pendingRows = new HashSet<>();
+ private HashSet<ImmutableBytesPtr> rowsToLock = new HashSet<>();
private BatchMutateContext(int clientVersion) {
this.clientVersion = clientVersion;
@@ -374,18 +375,27 @@ public class IndexRegionObserver extends BaseRegionObserver {
}
}
- private void lockRows(MiniBatchOperationInProgress<Mutation> miniBatchOp, BatchMutateContext context) throws IOException {
+ private void populateRowsToLock(MiniBatchOperationInProgress<Mutation> miniBatchOp, BatchMutateContext context) {
for (int i = 0; i < miniBatchOp.size(); i++) {
if (miniBatchOp.getOperationStatus(i) == IGNORE) {
continue;
}
Mutation m = miniBatchOp.getOperation(i);
if (this.builder.isEnabled(m)) {
- context.rowLocks.add(lockManager.lockRow(m.getRow(), rowLockWaitDuration));
+ ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+ if (!context.rowsToLock.contains(row)) {
+ context.rowsToLock.add(row);
+ }
}
}
}
+ private void lockRows(BatchMutateContext context) throws IOException {
+ for (ImmutableBytesPtr rowKey : context.rowsToLock) {
+ context.rowLocks.add(lockManager.lockRow(rowKey, rowLockWaitDuration));
+ }
+ }
+
private void populatePendingRows(BatchMutateContext context, long now) {
for (RowLock rowLock : context.rowLocks) {
ImmutableBytesPtr rowKey = rowLock.getRowKey();
@@ -598,7 +608,8 @@ public class IndexRegionObserver extends BaseRegionObserver {
* while determining the index updates
*/
if (replayWrite == null) {
- lockRows(miniBatchOp, context);
+ populateRowsToLock(miniBatchOp, context);
+ lockRows(context);
}
long now = EnvironmentEdgeManager.currentTimeMillis();
// Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index 6b3b368..fac5cb2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -53,6 +53,8 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.metrics.GlobalIndexCheckerSource;
import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
import org.apache.phoenix.hbase.index.table.HTableFactory;
@@ -84,6 +86,7 @@ public class GlobalIndexChecker extends BaseRegionObserver {
private Scan scan;
private Scan indexScan;
private Scan singleRowIndexScan;
+ private Scan singleRowDataScan;
private Scan buildIndexScan = null;
private Table dataHTable = null;
private byte[] emptyCF;
@@ -207,22 +210,71 @@ public class GlobalIndexChecker extends BaseRegionObserver {
}
}
- private void deleteRowIfAgedEnough(byte[] indexRowKey, List<Cell> row, long ts) throws IOException {
+ private void deleteRowIfAgedEnough(byte[] indexRowKey, List<Cell> row, long ts, boolean specific) throws IOException {
if ((EnvironmentEdgeManager.currentTimeMillis() - ts) > ageThreshold) {
Delete del = new Delete(indexRowKey, ts);
- // We are deleting a specific version of a row so the flowing loop is for that
- for (Cell cell : row) {
- del.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), cell.getTimestamp());
+ if (specific) {
+ // We are deleting a specific version of a row so the flowing loop is for that
+ for (Cell cell : row) {
+ del.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), cell.getTimestamp());
+ }
}
Mutation[] mutations = new Mutation[]{del};
region.batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
}
+ private boolean doesDataRowExist(byte[] indexRowKey, byte[] dataRowKey) throws IOException {
+ singleRowDataScan.setStartRow(dataRowKey);
+ singleRowDataScan.setStopRow(dataRowKey);
+ singleRowDataScan.setTimeRange(0, maxTimestamp);
+ try (ResultScanner resultScanner = dataHTable.getScanner(singleRowDataScan)) {
+ final Result result = resultScanner.next();
+ if (result == null) {
+ // There is no data table row for this index unverified index row. We need to skip it.
+ return false;
+ }
+ else {
+ ValueGetter getter = new ValueGetter() {
+ final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
+
+ @Override
+ public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
+ Cell cell = result.getColumnLatestCell(ref.getFamily(), ref.getQualifier());
+ if (cell == null) {
+ return null;
+ }
+ valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+ return valuePtr;
+ }
+
+ @Override
+ public byte[] getRowKey() {
+ return result.getRow();
+ }
+ };
+ for (Cell cell : result.rawCells()) {
+ String cellString = cell.toString();
+ LOG.debug("Rebuilt row :" + cellString + " value : " + Bytes.toStringBinary(CellUtil.cloneValue(cell)));
+ }
+ byte[] builtIndexRowKey = indexMaintainer.buildRowKey(getter, new ImmutableBytesWritable(dataRowKey), null, null, maxTimestamp);
+ if (Bytes.compareTo(builtIndexRowKey, 0, builtIndexRowKey.length,
+ indexRowKey, 0, indexRowKey.length) != 0) {
+ // The row key of the index row that has been built is different than the row key of the unverified
+ // index row
+ return false;
+ }
+ }
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(dataHTable.getName().toString(), t);
+ }
+ return true;
+ }
private void repairIndexRows(byte[] indexRowKey, long ts, List<Cell> row) throws IOException {
// Build the data table row key from the index table row key
if (buildIndexScan == null) {
buildIndexScan = new Scan();
+ singleRowDataScan = new Scan();
indexScan = new Scan(scan);
singleRowIndexScan = new Scan(scan);
byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME);
@@ -262,6 +314,14 @@ public class GlobalIndexChecker extends BaseRegionObserver {
} catch (Throwable t) {
ServerUtil.throwIOException(dataHTable.getName().toString(), t);
}
+ if (!doesDataRowExist(indexRowKey, dataRowKey)) {
+ // Delete the unverified row from index if it is old enough
+ deleteRowIfAgedEnough(indexRowKey, row, ts, false);
+ // Skip this unverified row (i.e., do not return it to the client). Just retuning empty row is
+ // sufficient to do that
+ row.clear();
+ return;
+ }
// Close the current scanner as the newly build row will not be visible to it
scanner.close();
// Open a new scanner starting from the current row
@@ -286,7 +346,7 @@ public class GlobalIndexChecker extends BaseRegionObserver {
// the data table row is masked by this unverified row, or that the corresponding data table row does
// exist
// First delete the unverified row from index if it is old enough
- deleteRowIfAgedEnough(indexRowKey, row, ts);
+ deleteRowIfAgedEnough(indexRowKey, row, ts, true);
// Now we will do a single row scan to retrieve the verified index row build from the data table row
// if such an index row exists. Note we cannot read all versions in one scan as the max number of row
// versions for an index table can be 1. In that case, we will get only one (i.e., the most recent