You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2019/11/06 19:16:23 UTC
[phoenix] branch 4.x-HBase-1.3 updated: PHOENIX-5558 Eliminate the
second single data row scan during read repairs
This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch 4.x-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.3 by this push:
new f92a383 PHOENIX-5558 Eliminate the second single data row scan during read repairs
f92a383 is described below
commit f92a383175f977f918a14fd570a9fbf9c130fd05
Author: Kadir <ko...@salesforce.com>
AuthorDate: Sun Nov 3 09:41:27 2019 -0800
PHOENIX-5558 Eliminate the second single data row scan during read repairs
---
.../coprocessor/BaseScannerRegionObserver.java | 1 +
.../UngroupedAggregateRegionObserver.java | 46 ++++++++++++++--
.../apache/phoenix/index/GlobalIndexChecker.java | 64 ++++++----------------
3 files changed, 60 insertions(+), 51 deletions(-)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index cb4d0af..7b5d246 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -114,6 +114,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
public static final String PHYSICAL_DATA_TABLE_NAME = "_PhysicalDataTableName";
public static final String EMPTY_COLUMN_FAMILY_NAME = "_EmptyCFName";
public static final String EMPTY_COLUMN_QUALIFIER_NAME = "_EmptyCQName";
+ public static final String INDEX_ROW_KEY = "_IndexRowKey";
public final static byte[] REPLAY_TABLE_AND_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(1);
public final static byte[] REPLAY_ONLY_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(2);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 8711f34..8477625 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -37,9 +37,7 @@ import java.security.PrivilegedExceptionAction;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -1068,8 +1066,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
private boolean useProto = true;
private Scan scan;
private RegionScanner innerScanner;
- final Region region;
- IndexMaintainer indexMaintainer;
+ private Region region;
+ private IndexMaintainer indexMaintainer;
+ private byte[] indexRowKey = null;
IndexRebuildRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan,
final Configuration config) {
@@ -1097,6 +1096,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
this.scan = scan;
this.innerScanner = innerScanner;
this.region = region;
+ indexRowKey = scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY);
}
@Override
@@ -1160,6 +1160,35 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
return uuidValue;
}
+ private boolean checkIndexRow(final byte[] indexRowKey, final Put put) throws IOException {
+ ValueGetter getter = new ValueGetter() {
+ final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
+
+ @Override
+ public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
+ List<Cell> cellList = put.get(ref.getFamily(), ref.getQualifier());
+ if (cellList == null || cellList.isEmpty()) {
+ return null;
+ }
+ Cell cell = cellList.get(0);
+ valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+ return valuePtr;
+ }
+
+ @Override
+ public byte[] getRowKey() {
+ return put.getRow();
+ }
+ };
+ byte[] builtIndexRowKey = indexMaintainer.buildRowKey(getter, new ImmutableBytesWritable(put.getRow()),
+ null, null, HConstants.LATEST_TIMESTAMP);
+ if (Bytes.compareTo(builtIndexRowKey, 0, builtIndexRowKey.length,
+ indexRowKey, 0, indexRowKey.length) != 0) {
+ return false;
+ }
+ return true;
+ }
+
@Override
public boolean next(List<Cell> results) throws IOException {
int rowCount = 0;
@@ -1190,6 +1219,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
del.addDeleteMarker(cell);
}
}
+ if (indexRowKey != null) {
+ // GlobalIndexChecker passed the index row key. This is to build a single index row.
+ // Check if the data table row we have just scanned matches with the index row key.
+ // If not, there is no need to build the index row from this data table row,
+ // and just return zero row count.
+ if (!checkIndexRow(indexRowKey, put)) {
+ break;
+ }
+ }
uuidValue = commitIfReady(uuidValue);
if (!scan.isRaw()) {
Delete deleteMarkers = generateDeleteMarkers(row);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index ecc720f..9cd78b3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
-import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.metrics.GlobalIndexCheckerSource;
import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
@@ -61,6 +60,8 @@ import org.apache.phoenix.hbase.index.table.HTableFactory;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ServerUtil;
@@ -86,7 +87,6 @@ public class GlobalIndexChecker extends BaseRegionObserver {
private Scan scan;
private Scan indexScan;
private Scan singleRowIndexScan;
- private Scan singleRowDataScan;
private Scan buildIndexScan = null;
private Table dataHTable = null;
private byte[] emptyCF;
@@ -224,53 +224,10 @@ public class GlobalIndexChecker extends BaseRegionObserver {
}
}
- private boolean doesDataRowExist(byte[] indexRowKey, byte[] dataRowKey) throws IOException {
- singleRowDataScan.setStartRow(dataRowKey);
- singleRowDataScan.setStopRow(dataRowKey);
- singleRowDataScan.setTimeRange(0, maxTimestamp);
- try (ResultScanner resultScanner = dataHTable.getScanner(singleRowDataScan)) {
- final Result result = resultScanner.next();
- if (result == null) {
- // There is no data table row for this index unverified index row. We need to skip it.
- return false;
- }
- else {
- ValueGetter getter = new ValueGetter() {
- final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
-
- @Override
- public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
- Cell cell = result.getColumnLatestCell(ref.getFamily(), ref.getQualifier());
- if (cell == null) {
- return null;
- }
- valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
- return valuePtr;
- }
-
- @Override
- public byte[] getRowKey() {
- return result.getRow();
- }
- };
- byte[] builtIndexRowKey = indexMaintainer.buildRowKey(getter, new ImmutableBytesWritable(dataRowKey), null, null, maxTimestamp);
- if (Bytes.compareTo(builtIndexRowKey, 0, builtIndexRowKey.length,
- indexRowKey, 0, indexRowKey.length) != 0) {
- // The row key of the index row that has been built is different than the row key of the unverified
- // index row
- return false;
- }
- }
- } catch (Throwable t) {
- ServerUtil.throwIOException(dataHTable.getName().toString(), t);
- }
- return true;
- }
private void repairIndexRows(byte[] indexRowKey, long ts, List<Cell> row) throws IOException {
// Build the data table row key from the index table row key
if (buildIndexScan == null) {
buildIndexScan = new Scan();
- singleRowDataScan = new Scan();
indexScan = new Scan(scan);
singleRowIndexScan = new Scan(scan);
byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME);
@@ -295,6 +252,11 @@ public class GlobalIndexChecker extends BaseRegionObserver {
buildIndexScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD));
buildIndexScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
buildIndexScan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true));
+ // Scan only columns included in the index table plus the empty column
+ for (ColumnReference column : indexMaintainer.getAllColumns()) {
+ buildIndexScan.addColumn(column.getFamily(), column.getQualifier());
+ }
+ buildIndexScan.addColumn(indexMaintainer.getDataEmptyKeyValueCF(), indexMaintainer.getEmptyKeyValueQualifier());
}
// Rebuild the index row from the corresponding the row in the the data table
// Get the data row key from the index row key
@@ -302,12 +264,20 @@ public class GlobalIndexChecker extends BaseRegionObserver {
buildIndexScan.setStartRow(dataRowKey);
buildIndexScan.setStopRow(dataRowKey);
buildIndexScan.setTimeRange(0, maxTimestamp);
+ // Pass the index row key to the partial index builder which will build the index row only when the data
+ // table row for dataRowKey matches with this unverified index row.
+ buildIndexScan.setAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY, indexRowKey);
+ Result result = null;
try (ResultScanner resultScanner = dataHTable.getScanner(buildIndexScan)){
- resultScanner.next();
+ result = resultScanner.next();
} catch (Throwable t) {
ServerUtil.throwIOException(dataHTable.getName().toString(), t);
}
- if (!doesDataRowExist(indexRowKey, dataRowKey)) {
+ // A single cell will be returned. We decode that here
+ byte[] value = result.value();
+ long rowCount = PLong.INSTANCE.getCodec().decodeLong(new ImmutableBytesWritable(value), SortOrder.getDefault());
+ if (rowCount == 0) {
+ // This means there does not exist a data table row for this unverified index row
// Delete the unverified row from index if it is old enough
deleteRowIfAgedEnough(indexRowKey, row, ts, false);
// Skip this unverified row (i.e., do not return it to the client). Just retuning empty row is