You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2019/11/06 19:16:23 UTC

[phoenix] branch 4.x-HBase-1.3 updated: PHOENIX-5558 Eliminate the second single data row scan during read repairs

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.x-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-HBase-1.3 by this push:
     new f92a383  PHOENIX-5558 Eliminate the second single data row scan during read repairs
f92a383 is described below

commit f92a383175f977f918a14fd570a9fbf9c130fd05
Author: Kadir <ko...@salesforce.com>
AuthorDate: Sun Nov 3 09:41:27 2019 -0800

    PHOENIX-5558 Eliminate the second single data row scan during read repairs
---
 .../coprocessor/BaseScannerRegionObserver.java     |  1 +
 .../UngroupedAggregateRegionObserver.java          | 46 ++++++++++++++--
 .../apache/phoenix/index/GlobalIndexChecker.java   | 64 ++++++----------------
 3 files changed, 60 insertions(+), 51 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index cb4d0af..7b5d246 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -114,6 +114,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     public static final String PHYSICAL_DATA_TABLE_NAME = "_PhysicalDataTableName";
     public static final String EMPTY_COLUMN_FAMILY_NAME = "_EmptyCFName";
     public static final String EMPTY_COLUMN_QUALIFIER_NAME = "_EmptyCQName";
+    public static final String INDEX_ROW_KEY = "_IndexRowKey";
     
     public final static byte[] REPLAY_TABLE_AND_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(1);
     public final static byte[] REPLAY_ONLY_INDEX_WRITES = PUnsignedTinyint.INSTANCE.toBytes(2);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 8711f34..8477625 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -37,9 +37,7 @@ import java.security.PrivilegedExceptionAction;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
@@ -1068,8 +1066,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         private boolean useProto = true;
         private Scan scan;
         private RegionScanner innerScanner;
-        final Region region;
-        IndexMaintainer indexMaintainer;
+        private Region region;
+        private IndexMaintainer indexMaintainer;
+        private byte[] indexRowKey = null;
 
         IndexRebuildRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan,
                                    final Configuration config) {
@@ -1097,6 +1096,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             this.scan = scan;
             this.innerScanner = innerScanner;
             this.region = region;
+            indexRowKey = scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY);
         }
 
         @Override
@@ -1160,6 +1160,35 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             return uuidValue;
         }
 
+        private boolean checkIndexRow(final byte[] indexRowKey, final Put put) throws IOException {
+            ValueGetter getter = new ValueGetter() {
+                final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
+
+                @Override
+                public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
+                    List<Cell> cellList = put.get(ref.getFamily(), ref.getQualifier());
+                    if (cellList == null || cellList.isEmpty()) {
+                        return null;
+                    }
+                    Cell cell = cellList.get(0);
+                    valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+                    return valuePtr;
+                }
+
+                @Override
+                public byte[] getRowKey() {
+                    return put.getRow();
+                }
+            };
+            byte[] builtIndexRowKey = indexMaintainer.buildRowKey(getter, new ImmutableBytesWritable(put.getRow()),
+                    null, null, HConstants.LATEST_TIMESTAMP);
+            if (Bytes.compareTo(builtIndexRowKey, 0, builtIndexRowKey.length,
+                    indexRowKey, 0, indexRowKey.length) != 0) {
+                return false;
+            }
+            return true;
+        }
+
         @Override
         public boolean next(List<Cell> results) throws IOException {
             int rowCount = 0;
@@ -1190,6 +1219,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                                     del.addDeleteMarker(cell);
                                 }
                             }
+                            if (indexRowKey != null) {
+                                // GlobalIndexChecker passed the index row key. This is to build a single index row.
+                                // Check if the data table row we have just scanned matches with the index row key.
+                                // If not, there is no need to build the index row from this data table row,
+                                // and just return zero row count.
+                                if (!checkIndexRow(indexRowKey, put)) {
+                                    break;
+                                }
+                            }
                             uuidValue = commitIfReady(uuidValue);
                             if (!scan.isRaw()) {
                                 Delete deleteMarkers = generateDeleteMarkers(row);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index ecc720f..9cd78b3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
-import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.metrics.GlobalIndexCheckerSource;
 import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
@@ -61,6 +60,8 @@ import org.apache.phoenix.hbase.index.table.HTableFactory;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ServerUtil;
@@ -86,7 +87,6 @@ public class GlobalIndexChecker extends BaseRegionObserver {
         private Scan scan;
         private Scan indexScan;
         private Scan singleRowIndexScan;
-        private Scan singleRowDataScan;
         private Scan buildIndexScan = null;
         private Table dataHTable = null;
         private byte[] emptyCF;
@@ -224,53 +224,10 @@ public class GlobalIndexChecker extends BaseRegionObserver {
             }
         }
 
-        private boolean doesDataRowExist(byte[] indexRowKey, byte[] dataRowKey) throws IOException {
-            singleRowDataScan.setStartRow(dataRowKey);
-            singleRowDataScan.setStopRow(dataRowKey);
-            singleRowDataScan.setTimeRange(0, maxTimestamp);
-            try (ResultScanner resultScanner = dataHTable.getScanner(singleRowDataScan)) {
-                final Result result = resultScanner.next();
-                if (result == null) {
-                    // There is no data table row for this index unverified index row. We need to skip it.
-                    return false;
-                }
-                else {
-                    ValueGetter getter = new ValueGetter() {
-                        final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
-
-                        @Override
-                        public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
-                            Cell cell = result.getColumnLatestCell(ref.getFamily(), ref.getQualifier());
-                            if (cell == null) {
-                                return null;
-                            }
-                            valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
-                            return valuePtr;
-                        }
-
-                        @Override
-                        public byte[] getRowKey() {
-                            return result.getRow();
-                        }
-                    };
-                    byte[] builtIndexRowKey = indexMaintainer.buildRowKey(getter, new ImmutableBytesWritable(dataRowKey), null, null, maxTimestamp);
-                    if (Bytes.compareTo(builtIndexRowKey, 0, builtIndexRowKey.length,
-                            indexRowKey, 0, indexRowKey.length) != 0) {
-                        // The row key of the index row that has been built is different than the row key of the unverified
-                        // index row
-                        return false;
-                    }
-                }
-            } catch (Throwable t) {
-                ServerUtil.throwIOException(dataHTable.getName().toString(), t);
-            }
-            return true;
-        }
         private void repairIndexRows(byte[] indexRowKey, long ts, List<Cell> row) throws IOException {
             // Build the data table row key from the index table row key
             if (buildIndexScan == null) {
                 buildIndexScan = new Scan();
-                singleRowDataScan = new Scan();
                 indexScan = new Scan(scan);
                 singleRowIndexScan = new Scan(scan);
                 byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME);
@@ -295,6 +252,11 @@ public class GlobalIndexChecker extends BaseRegionObserver {
                 buildIndexScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD));
                 buildIndexScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
                 buildIndexScan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true));
+                // Scan only columns included in the index table plus the empty column
+                for (ColumnReference column : indexMaintainer.getAllColumns()) {
+                    buildIndexScan.addColumn(column.getFamily(), column.getQualifier());
+                }
+                buildIndexScan.addColumn(indexMaintainer.getDataEmptyKeyValueCF(), indexMaintainer.getEmptyKeyValueQualifier());
             }
             // Rebuild the index row from the corresponding the row in the the data table
             // Get the data row key from the index row key
@@ -302,12 +264,20 @@ public class GlobalIndexChecker extends BaseRegionObserver {
             buildIndexScan.setStartRow(dataRowKey);
             buildIndexScan.setStopRow(dataRowKey);
             buildIndexScan.setTimeRange(0, maxTimestamp);
+            // Pass the index row key to the partial index builder which will build the index row only when the data
+            // table row for dataRowKey matches with this unverified index row.
+            buildIndexScan.setAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY, indexRowKey);
+            Result result = null;
             try (ResultScanner resultScanner = dataHTable.getScanner(buildIndexScan)){
-                resultScanner.next();
+                result = resultScanner.next();
             } catch (Throwable t) {
                 ServerUtil.throwIOException(dataHTable.getName().toString(), t);
             }
-            if (!doesDataRowExist(indexRowKey, dataRowKey)) {
+            // A single cell will be returned. We decode that here
+            byte[] value = result.value();
+            long rowCount = PLong.INSTANCE.getCodec().decodeLong(new ImmutableBytesWritable(value), SortOrder.getDefault());
+            if (rowCount == 0) {
+                // This means there does not exist a data table row for this unverified index row
                 // Delete the unverified row from index if it is old enough
                 deleteRowIfAgedEnough(indexRowKey, row, ts, false);
                 // Skip this unverified row (i.e., do not return it to the client). Just retuning empty row is