You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sk...@apache.org on 2020/03/17 19:24:45 UTC
[phoenix] branch PHOENIX-5748-4.x-HBase-1.5 updated: PHOENIX-5749: Add unit tests for verifySingleIndexRow() of IndexRebui… (#725)
This is an automated email from the ASF dual-hosted git repository.
skadam pushed a commit to branch PHOENIX-5748-4.x-HBase-1.5
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-5748-4.x-HBase-1.5 by this push:
new d6f939b PHOENIX-5749: Add unit tests for verifySingleIndexRow() of IndexRebui… (#725)
d6f939b is described below
commit d6f939bc903480675825bea6c3b1b2be04bca599
Author: Swaroopa Kadam <sw...@gmail.com>
AuthorDate: Tue Mar 17 12:24:38 2020 -0700
PHOENIX-5749: Add unit tests for verifySingleIndexRow() of IndexRebui… (#725)
PHOENIX-5749: Add unit tests for verifySingleIndexRow() of IndexRebuildRegionScanner
---
.../coprocessor/IndexRebuildRegionScanner.java | 304 ++--------
.../coprocessor/IndexToolVerificationResult.java | 304 ++++++++++
.../index/PhoenixIndexImportDirectReducer.java | 5 +-
.../phoenix/index/VerifySingleIndexRowTest.java | 637 +++++++++++++++++++++
4 files changed, 998 insertions(+), 252 deletions(-)
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 6cb1145..ad549e5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -42,15 +42,16 @@ import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATT
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
+import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@@ -109,236 +110,14 @@ import com.google.common.collect.Maps;
public class IndexRebuildRegionScanner extends BaseRegionScanner {
- public static class VerificationResult {
- public static class PhaseResult {
- private long validIndexRowCount = 0;
- private long expiredIndexRowCount = 0;
- private long missingIndexRowCount = 0;
- private long invalidIndexRowCount = 0;
-
- public void add(PhaseResult phaseResult) {
- validIndexRowCount += phaseResult.validIndexRowCount;
- expiredIndexRowCount += phaseResult.expiredIndexRowCount;
- missingIndexRowCount += phaseResult.missingIndexRowCount;
- invalidIndexRowCount += phaseResult.invalidIndexRowCount;
- }
-
- public long getTotalCount() {
- return validIndexRowCount + expiredIndexRowCount + missingIndexRowCount + invalidIndexRowCount;
- }
-
- @Override
- public String toString() {
- return "PhaseResult{" +
- "validIndexRowCount=" + validIndexRowCount +
- ", expiredIndexRowCount=" + expiredIndexRowCount +
- ", missingIndexRowCount=" + missingIndexRowCount +
- ", invalidIndexRowCount=" + invalidIndexRowCount +
- '}';
- }
- }
-
- private long scannedDataRowCount = 0;
- private long rebuiltIndexRowCount = 0;
- private PhaseResult before = new PhaseResult();
- private PhaseResult after = new PhaseResult();
-
- @Override
- public String toString() {
- return "VerificationResult{" +
- "scannedDataRowCount=" + scannedDataRowCount +
- ", rebuiltIndexRowCount=" + rebuiltIndexRowCount +
- ", before=" + before +
- ", after=" + after +
- '}';
- }
-
- public long getScannedDataRowCount() {
- return scannedDataRowCount;
- }
-
- public long getRebuiltIndexRowCount() {
- return rebuiltIndexRowCount;
- }
-
- public long getBeforeRebuildValidIndexRowCount() {
- return before.validIndexRowCount;
- }
-
- public long getBeforeRebuildExpiredIndexRowCount() {
- return before.expiredIndexRowCount;
- }
-
- public long getBeforeRebuildInvalidIndexRowCount() {
- return before.invalidIndexRowCount;
- }
-
- public long getBeforeRebuildMissingIndexRowCount() {
- return before.missingIndexRowCount;
- }
-
- public long getAfterRebuildValidIndexRowCount() {
- return after.validIndexRowCount;
- }
-
- public long getAfterRebuildExpiredIndexRowCount() {
- return after.expiredIndexRowCount;
- }
-
- public long getAfterRebuildInvalidIndexRowCount() {
- return after.invalidIndexRowCount;
- }
-
- public long getAfterRebuildMissingIndexRowCount() {
- return after.missingIndexRowCount;
- }
-
- private void addScannedDataRowCount(long count) {
- this.scannedDataRowCount += count;
- }
-
- private void addRebuiltIndexRowCount(long count) {
- this.rebuiltIndexRowCount += count;
- }
-
- private void addBeforeRebuildValidIndexRowCount(long count) {
- before.validIndexRowCount += count;
- }
-
- private void addBeforeRebuildExpiredIndexRowCount(long count) {
- before.expiredIndexRowCount += count;
- }
-
- private void addBeforeRebuildMissingIndexRowCount(long count) {
- before.missingIndexRowCount += count;
- }
-
- private void addBeforeRebuildInvalidIndexRowCount(long count) {
- before.invalidIndexRowCount += count;
- }
-
- private void addAfterRebuildValidIndexRowCount(long count) {
- after.validIndexRowCount += count;
- }
-
- private void addAfterRebuildExpiredIndexRowCount(long count) {
- after.expiredIndexRowCount += count;
- }
-
- private void addAfterRebuildMissingIndexRowCount(long count) {
- after.missingIndexRowCount += count;
- }
-
- private void addAfterRebuildInvalidIndexRowCount(long count) {
- after.invalidIndexRowCount += count;
- }
-
- private static boolean isAfterRebuildInvalidIndexRowCount(Cell cell) {
- if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
- AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, 0,
- AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES.length) == 0) {
- return true;
- }
- return false;
- }
-
- private long getValue(Cell cell) {
- return Long.parseLong(Bytes.toString(cell.getValueArray(),
- cell.getValueOffset(), cell.getValueLength()));
- }
-
- private void update(Cell cell) {
- if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES)) {
- addScannedDataRowCount(getValue(cell));
- } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, REBUILT_INDEX_ROW_COUNT_BYTES)) {
- addRebuiltIndexRowCount(getValue(cell));
- } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) {
- addBeforeRebuildValidIndexRowCount(getValue(cell));
- } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) {
- addBeforeRebuildExpiredIndexRowCount(getValue(cell));
- } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES)) {
- addBeforeRebuildMissingIndexRowCount(getValue(cell));
- } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) {
- addBeforeRebuildInvalidIndexRowCount(getValue(cell));
- } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) {
- addAfterRebuildValidIndexRowCount(getValue(cell));
- } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) {
- addAfterRebuildExpiredIndexRowCount(getValue(cell));
- } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES)) {
- addAfterRebuildMissingIndexRowCount(getValue(cell));
- } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) {
- addAfterRebuildInvalidIndexRowCount(getValue(cell));
- }
- }
-
- public static byte[] calculateTheClosestNextRowKeyForPrefix(byte[] rowKeyPrefix) {
- // Essentially we are treating it like an 'unsigned very very long' and doing +1 manually.
- // Search for the place where the trailing 0xFFs start
- int offset = rowKeyPrefix.length;
- while (offset > 0) {
- if (rowKeyPrefix[offset - 1] != (byte) 0xFF) {
- break;
- }
- offset--;
- }
- if (offset == 0) {
- // We got an 0xFFFF... (only FFs) stopRow value which is
- // the last possible prefix before the end of the table.
- // So set it to stop at the 'end of the table'
- return HConstants.EMPTY_END_ROW;
- }
- // Copy the right length of the original
- byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset);
- // And increment the last one
- newStopRow[newStopRow.length - 1]++;
- return newStopRow;
- }
-
- public static VerificationResult getVerificationResult(Table hTable, long ts)
- throws IOException {
- VerificationResult verificationResult = new VerificationResult();
- byte[] startRowKey = Bytes.toBytes(Long.toString(ts));
- byte[] stopRowKey = calculateTheClosestNextRowKeyForPrefix(startRowKey);
- Scan scan = new Scan();
- scan.setStartRow(startRowKey);
- scan.setStopRow(stopRowKey);
- ResultScanner scanner = hTable.getScanner(scan);
- for (Result result = scanner.next(); result != null; result = scanner.next()) {
- for (Cell cell : result.rawCells()) {
- verificationResult.update(cell);
- }
- }
- return verificationResult;
- }
-
- public boolean isVerificationFailed(IndexTool.IndexVerifyType verifyType) {
- if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.NONE) {
- return false;
- } else if (verifyType == IndexTool.IndexVerifyType.ONLY) {
- if (before.invalidIndexRowCount + before.missingIndexRowCount > 0) {
- return true;
- }
- } else if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) {
- if (after.invalidIndexRowCount + after.missingIndexRowCount > 0) {
- return true;
- }
- }
- return false;
- }
-
- public void add(VerificationResult verificationResult) {
- scannedDataRowCount += verificationResult.scannedDataRowCount;
- rebuiltIndexRowCount += verificationResult.rebuiltIndexRowCount;
- before.add(verificationResult.before);
- after.add(verificationResult.after);
- }
- }
-
private static final Logger LOGGER = LoggerFactory.getLogger(IndexRebuildRegionScanner.class);
public static final String NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY = "index.verify.threads.max";
private static final int DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS = 17;
public static final String INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY = "index.verify.threads.max";
private static final int DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK = 2048;
+ public static final String NO_EXPECTED_MUTATION = "No expected mutation";
+ public static final String
+ ACTUAL_MUTATION_IS_NULL_OR_EMPTY = "actualMutationList is null or empty";
private long pageSizeInRows = Long.MAX_VALUE;
private int rowCountPerTask;
private boolean hasMore;
@@ -367,15 +146,16 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
private UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver;
private RegionCoprocessorEnvironment env;
private HTableFactory hTableFactory;
- private int indexTableTTL;
- private VerificationResult verificationResult;
+ private int indexTableTTL = 0;
+ private IndexToolVerificationResult verificationResult;
private boolean isBeforeRebuilt = true;
private boolean partialRebuild = false;
private int singleRowRebuildReturnCode;
private Map<byte[], NavigableSet<byte[]>> familyMap;
private byte[][] viewConstants;
- IndexRebuildRegionScanner(final RegionScanner innerScanner, final Region region, final Scan scan,
+ @VisibleForTesting
+ public IndexRebuildRegionScanner(final RegionScanner innerScanner, final Region region, final Scan scan,
final RegionCoprocessorEnvironment env,
UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver) throws IOException {
super(innerScanner);
@@ -416,7 +196,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
}
byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE);
if (valueBytes != null) {
- verificationResult = new VerificationResult();
+ verificationResult = new IndexToolVerificationResult();
verifyType = IndexTool.IndexVerifyType.fromValue(valueBytes);
if (verifyType != IndexTool.IndexVerifyType.NONE) {
verify = true;
@@ -552,6 +332,24 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
return uuidValue;
}
+ @VisibleForTesting
+ public int setIndexTableTTL(int ttl) {
+ indexTableTTL = ttl;
+ return 0;
+ }
+
+ @VisibleForTesting
+ public int setIndexMaintainer(IndexMaintainer indexMaintainer) {
+ this.indexMaintainer = indexMaintainer;
+ return 0;
+ }
+
+ @VisibleForTesting
+ public int setIndexKeyToMutationMap(Map<byte[], List<Mutation>> newTreeMap) {
+ this.indexKeyToMutationMap = newTreeMap;
+ return 0;
+ }
+
public static class SimpleValueGetter implements ValueGetter {
final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
final Put put;
@@ -578,7 +376,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
}
- private byte[] getIndexRowKey(final Put dataRow) throws IOException {
+ public byte[] getIndexRowKey(final Put dataRow) throws IOException {
ValueGetter valueGetter = new SimpleValueGetter(dataRow);
byte[] builtIndexRowKey = indexMaintainer.buildRowKey(valueGetter, new ImmutableBytesWritable(dataRow.getRow()),
null, null, HConstants.LATEST_TIMESTAMP);
@@ -594,14 +392,16 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
return true;
}
- private void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
+ @VisibleForTesting
+ public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
String errorMsg) throws IOException {
logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs,
errorMsg, null, null);
}
- private void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
+ @VisibleForTesting
+ public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
String errorMsg, byte[] expectedValue, byte[] actualValue) throws IOException {
final byte[] E_VALUE_PREFIX_BYTES = Bytes.toBytes(" E:");
final byte[] A_VALUE_PREFIX_BYTES = Bytes.toBytes(" A:");
@@ -794,7 +594,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
return false;
}
- private static List<Mutation> prepareActualIndexMutations(Result indexRow) throws IOException {
+ @VisibleForTesting
+ public List<Mutation> prepareActualIndexMutations(Result indexRow) throws IOException {
Put put = null;
Delete del = null;
for (Cell cell : indexRow.rawCells()) {
@@ -845,15 +646,16 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
* the data table mutation for which the delete marker is added. Thus, the timestamp of these delete markers will be
* higher than the timestamp of index row to be deleted.
*/
- private boolean verifySingleIndexRow(Result indexRow, VerificationResult.PhaseResult verificationPhaseResult)
+ @VisibleForTesting
+ public boolean verifySingleIndexRow(Result indexRow, IndexToolVerificationResult.PhaseResult verificationPhaseResult)
throws IOException {
List<Mutation> expectedMutationList = indexKeyToMutationMap.get(indexRow.getRow());
if (expectedMutationList == null) {
- throw new DoNotRetryIOException("No expected mutation");
+ throw new DoNotRetryIOException(NO_EXPECTED_MUTATION);
}
List<Mutation> actualMutationList = prepareActualIndexMutations(indexRow);
if (actualMutationList == null || actualMutationList.isEmpty()) {
- throw new DoNotRetryIOException("actualMutationList is null or empty");
+ throw new DoNotRetryIOException(ACTUAL_MUTATION_IS_NULL_OR_EMPTY);
}
Collections.sort(expectedMutationList, MUTATION_TS_DESC_COMPARATOR);
Collections.sort(actualMutationList, MUTATION_TS_DESC_COMPARATOR);
@@ -989,18 +791,18 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
Put put = pair.getFirst();
long ts1 = 0;
if (put != null) {
- ts1 = getMaxTimestamp((Mutation)put);
+ ts1 = getMaxTimestamp(put);
}
Delete del = pair.getSecond();
long ts2 = 0;
if (del != null) {
- ts1 = getMaxTimestamp((Mutation)del);
+ ts1 = getMaxTimestamp(del);
}
return (ts1 > ts2) ? ts1 : ts2;
}
private void verifyIndexRows(List<KeyRange> keys,
- VerificationResult.PhaseResult verificationPhaseResult) throws IOException {
+ IndexToolVerificationResult.PhaseResult verificationPhaseResult) throws IOException {
List<KeyRange> invalidKeys = new ArrayList<>();
ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
Scan indexScan = new Scan();
@@ -1063,7 +865,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
}
private void addVerifyTask(final List<KeyRange> keys,
- final VerificationResult.PhaseResult verificationPhaseResult) {
+ final IndexToolVerificationResult.PhaseResult verificationPhaseResult) {
tasks.add(new Task<Boolean>() {
@Override
public Boolean call() throws Exception {
@@ -1081,14 +883,14 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
});
}
- private void parallelizeIndexVerify(VerificationResult.PhaseResult verificationPhaseResult) throws IOException {
+ private void parallelizeIndexVerify(IndexToolVerificationResult.PhaseResult verificationPhaseResult) throws IOException {
int taskCount = (indexKeyToMutationMap.size() + rowCountPerTask - 1) / rowCountPerTask;
tasks = new TaskBatch<>(taskCount);
List<List<KeyRange>> listOfKeyRangeList = new ArrayList<>(taskCount);
- List<VerificationResult.PhaseResult> verificationPhaseResultList = new ArrayList<>(taskCount);
+ List<IndexToolVerificationResult.PhaseResult> verificationPhaseResultList = new ArrayList<>(taskCount);
List<KeyRange> keys = new ArrayList<>(rowCountPerTask);
listOfKeyRangeList.add(keys);
- VerificationResult.PhaseResult perTaskVerificationPhaseResult = new VerificationResult.PhaseResult();
+ IndexToolVerificationResult.PhaseResult perTaskVerificationPhaseResult = new IndexToolVerificationResult.PhaseResult();
verificationPhaseResultList.add(perTaskVerificationPhaseResult);
for (byte[] indexKey: indexKeyToMutationMap.keySet()) {
keys.add(PVarbinary.INSTANCE.getKeyRange(indexKey));
@@ -1096,7 +898,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
addVerifyTask(keys, perTaskVerificationPhaseResult);
keys = new ArrayList<>(rowCountPerTask);
listOfKeyRangeList.add(keys);
- perTaskVerificationPhaseResult = new VerificationResult.PhaseResult();
+ perTaskVerificationPhaseResult = new IndexToolVerificationResult.PhaseResult();
verificationPhaseResultList.add(perTaskVerificationPhaseResult);
}
}
@@ -1118,7 +920,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
throw new IOException(exceptionMessage);
}
}
- for (VerificationResult.PhaseResult result : verificationPhaseResultList) {
+ for (IndexToolVerificationResult.PhaseResult result : verificationPhaseResultList) {
verificationPhaseResult.add(result);
}
if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) {
@@ -1162,7 +964,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
}
private void verifyAndOrRebuildIndex() throws IOException {
- VerificationResult nextVerificationResult = new VerificationResult();
+ IndexToolVerificationResult nextVerificationResult = new IndexToolVerificationResult();
nextVerificationResult.scannedDataRowCount = dataKeyToMutationMap.size();
if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.NONE) {
// For these options we start with rebuilding index rows
@@ -1175,7 +977,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
}
if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH ||
verifyType == IndexTool.IndexVerifyType.ONLY) {
- VerificationResult.PhaseResult verificationPhaseResult = new VerificationResult.PhaseResult();
+ IndexToolVerificationResult.PhaseResult verificationPhaseResult = new IndexToolVerificationResult.PhaseResult();
// For these options we start with verifying index rows
parallelizeIndexVerify(verificationPhaseResult);
nextVerificationResult.before.add(verificationPhaseResult);
@@ -1200,7 +1002,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) {
// We have rebuilt index row and now we need to verify them
- VerificationResult.PhaseResult verificationPhaseResult = new VerificationResult.PhaseResult();
+ IndexToolVerificationResult.PhaseResult verificationPhaseResult = new IndexToolVerificationResult.PhaseResult();
indexKeyToMutationMap.clear();
for (Map.Entry<byte[], Pair<Put, Delete>> entry: dataKeyToMutationMap.entrySet()) {
prepareIndexMutations(entry.getValue().getFirst(), entry.getValue().getSecond());
@@ -1450,7 +1252,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
return indexMutations;
}
- private void prepareIndexMutations(Put put, Delete del) throws IOException{
+ @VisibleForTesting
+ public int prepareIndexMutations(Put put, Delete del) throws IOException {
List<Mutation> indexMutations = prepareIndexMutationsForRebuild(indexMaintainer, put, del);
for (Mutation mutation : indexMutations) {
byte[] indexRowKey = mutation.getRow();
@@ -1463,6 +1266,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
mutationList.add(mutation);
}
}
+ return 0;
}
@Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java
new file mode 100644
index 0000000..ed92fad
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.REBUILT_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.RESULT_TABLE_COLUMN_FAMILY;
+import static org.apache.phoenix.mapreduce.index.IndexTool.SCANNED_DATA_ROW_COUNT_BYTES;
+
+public class IndexToolVerificationResult {
+ public static class PhaseResult {
+ long validIndexRowCount = 0;
+ long expiredIndexRowCount = 0;
+ long missingIndexRowCount = 0;
+ long invalidIndexRowCount = 0;
+
+ public void add(PhaseResult phaseResult) {
+ validIndexRowCount += phaseResult.validIndexRowCount;
+ expiredIndexRowCount += phaseResult.expiredIndexRowCount;
+ missingIndexRowCount += phaseResult.missingIndexRowCount;
+ invalidIndexRowCount += phaseResult.invalidIndexRowCount;
+ }
+
+ public PhaseResult(){}
+
+ public PhaseResult(long validIndexRowCount, long expiredIndexRowCount,
+ long missingIndexRowCount, long invalidIndexRowCount) {
+ this.validIndexRowCount = validIndexRowCount;
+ this.expiredIndexRowCount = expiredIndexRowCount;
+ this.missingIndexRowCount = missingIndexRowCount;
+ this.invalidIndexRowCount = invalidIndexRowCount;
+ }
+
+ public long getTotalCount() {
+ return validIndexRowCount + expiredIndexRowCount + missingIndexRowCount + invalidIndexRowCount;
+ }
+
+ @Override
+ public String toString() {
+ return "PhaseResult{" +
+ "validIndexRowCount=" + validIndexRowCount +
+ ", expiredIndexRowCount=" + expiredIndexRowCount +
+ ", missingIndexRowCount=" + missingIndexRowCount +
+ ", invalidIndexRowCount=" + invalidIndexRowCount +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null) {
+ return false;
+ }
+ if (!(o instanceof PhaseResult)) {
+ return false;
+ }
+ PhaseResult pr = (PhaseResult) o;
+ return this.expiredIndexRowCount == pr.expiredIndexRowCount
+ && this.validIndexRowCount == pr.validIndexRowCount
+ && this.invalidIndexRowCount == pr.invalidIndexRowCount
+ && this.missingIndexRowCount == pr.missingIndexRowCount;
+ }
+
+ @Override
+ public int hashCode() {
+ long result = 17;
+ result = 31 * result + expiredIndexRowCount;
+ result = 31 * result + validIndexRowCount;
+ result = 31 * result + missingIndexRowCount;
+ result = 31 * result + invalidIndexRowCount;
+ return (int)result;
+ }
+ }
+
+ long scannedDataRowCount = 0;
+ long rebuiltIndexRowCount = 0;
+ PhaseResult before = new PhaseResult();
+ PhaseResult after = new PhaseResult();
+
+ @Override
+ public String toString() {
+ return "VerificationResult{" +
+ "scannedDataRowCount=" + scannedDataRowCount +
+ ", rebuiltIndexRowCount=" + rebuiltIndexRowCount +
+ ", before=" + before +
+ ", after=" + after +
+ '}';
+ }
+
+ public long getScannedDataRowCount() {
+ return scannedDataRowCount;
+ }
+
+ public long getRebuiltIndexRowCount() {
+ return rebuiltIndexRowCount;
+ }
+
+ public long getBeforeRebuildValidIndexRowCount() {
+ return before.validIndexRowCount;
+ }
+
+ public long getBeforeRebuildExpiredIndexRowCount() {
+ return before.expiredIndexRowCount;
+ }
+
+ public long getBeforeRebuildInvalidIndexRowCount() {
+ return before.invalidIndexRowCount;
+ }
+
+ public long getBeforeRebuildMissingIndexRowCount() {
+ return before.missingIndexRowCount;
+ }
+
+ public long getAfterRebuildValidIndexRowCount() {
+ return after.validIndexRowCount;
+ }
+
+ public long getAfterRebuildExpiredIndexRowCount() {
+ return after.expiredIndexRowCount;
+ }
+
+ public long getAfterRebuildInvalidIndexRowCount() {
+ return after.invalidIndexRowCount;
+ }
+
+ public long getAfterRebuildMissingIndexRowCount() {
+ return after.missingIndexRowCount;
+ }
+
+ private void addScannedDataRowCount(long count) {
+ this.scannedDataRowCount += count;
+ }
+
+ private void addRebuiltIndexRowCount(long count) {
+ this.rebuiltIndexRowCount += count;
+ }
+
+ private void addBeforeRebuildValidIndexRowCount(long count) {
+ before.validIndexRowCount += count;
+ }
+
+ private void addBeforeRebuildExpiredIndexRowCount(long count) {
+ before.expiredIndexRowCount += count;
+ }
+
+ private void addBeforeRebuildMissingIndexRowCount(long count) {
+ before.missingIndexRowCount += count;
+ }
+
+ private void addBeforeRebuildInvalidIndexRowCount(long count) {
+ before.invalidIndexRowCount += count;
+ }
+
+ private void addAfterRebuildValidIndexRowCount(long count) {
+ after.validIndexRowCount += count;
+ }
+
+ private void addAfterRebuildExpiredIndexRowCount(long count) {
+ after.expiredIndexRowCount += count;
+ }
+
+ private void addAfterRebuildMissingIndexRowCount(long count) {
+ after.missingIndexRowCount += count;
+ }
+
+ private void addAfterRebuildInvalidIndexRowCount(long count) {
+ after.invalidIndexRowCount += count;
+ }
+
+ private static boolean isAfterRebuildInvalidIndexRowCount(Cell cell) {
+ if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+ AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, 0,
+ AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES.length) == 0) {
+ return true;
+ }
+ return false;
+ }
+
+ private long getValue(Cell cell) {
+ return Long.parseLong(Bytes.toString(cell.getValueArray(),
+ cell.getValueOffset(), cell.getValueLength()));
+ }
+
+ private void update(Cell cell) {
+ if (CellUtil
+ .matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES)) {
+ addScannedDataRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, REBUILT_INDEX_ROW_COUNT_BYTES)) {
+ addRebuiltIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) {
+ addBeforeRebuildValidIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) {
+ addBeforeRebuildExpiredIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES)) {
+ addBeforeRebuildMissingIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) {
+ addBeforeRebuildInvalidIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) {
+ addAfterRebuildValidIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) {
+ addAfterRebuildExpiredIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES)) {
+ addAfterRebuildMissingIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) {
+ addAfterRebuildInvalidIndexRowCount(getValue(cell));
+ }
+ }
+
+ public static byte[] calculateTheClosestNextRowKeyForPrefix(byte[] rowKeyPrefix) {
+ // Essentially we are treating it like an 'unsigned very very long' and doing +1 manually.
+ // Search for the place where the trailing 0xFFs start
+ int offset = rowKeyPrefix.length;
+ while (offset > 0) {
+ if (rowKeyPrefix[offset - 1] != (byte) 0xFF) {
+ break;
+ }
+ offset--;
+ }
+ if (offset == 0) {
+ // We got an 0xFFFF... (only FFs) stopRow value which is
+ // the last possible prefix before the end of the table.
+ // So set it to stop at the 'end of the table'
+ return HConstants.EMPTY_END_ROW;
+ }
+ // Copy the right length of the original
+ byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset);
+ // And increment the last one
+ newStopRow[newStopRow.length - 1]++;
+ return newStopRow;
+ }
+
+ public static IndexToolVerificationResult getVerificationResult(Table hTable, long ts)
+ throws IOException {
+ IndexToolVerificationResult verificationResult = new IndexToolVerificationResult();
+ byte[] startRowKey = Bytes.toBytes(Long.toString(ts));
+ byte[] stopRowKey = calculateTheClosestNextRowKeyForPrefix(startRowKey);
+ Scan scan = new Scan();
+ scan.setStartRow(startRowKey);
+ scan.setStopRow(stopRowKey);
+ ResultScanner scanner = hTable.getScanner(scan);
+ for (Result result = scanner.next(); result != null; result = scanner.next()) {
+ for (Cell cell : result.rawCells()) {
+ verificationResult.update(cell);
+ }
+ }
+ return verificationResult;
+ }
+
+ public boolean isVerificationFailed(IndexTool.IndexVerifyType verifyType) {
+ if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.NONE) {
+ return false;
+ } else if (verifyType == IndexTool.IndexVerifyType.ONLY) {
+ if (before.invalidIndexRowCount + before.missingIndexRowCount > 0) {
+ return true;
+ }
+ } else if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) {
+ if (after.invalidIndexRowCount + after.missingIndexRowCount > 0) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public void add(IndexToolVerificationResult verificationResult) {
+ scannedDataRowCount += verificationResult.scannedDataRowCount;
+ rebuiltIndexRowCount += verificationResult.rebuiltIndexRowCount;
+ before.add(verificationResult.before);
+ after.add(verificationResult.after);
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
index 98000f7..8d1b4db 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
+import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
import org.apache.phoenix.coprocessor.TaskRegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
@@ -63,8 +64,8 @@ public class PhoenixIndexImportDirectReducer extends
long ts = Long.valueOf(configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE));
Table hTable = connection.unwrap(PhoenixConnection.class).getQueryServices()
.getTable(IndexTool.RESULT_TABLE_NAME_BYTES);
- IndexRebuildRegionScanner.VerificationResult verificationResult =
- IndexRebuildRegionScanner.VerificationResult.getVerificationResult(hTable, ts);
+ IndexToolVerificationResult verificationResult =
+ IndexToolVerificationResult.getVerificationResult(hTable, ts);
context.getCounter(PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT).
setValue(verificationResult.getScannedDataRowCount());
context.getCounter(PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT).
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
new file mode 100644
index 0000000..2506609
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
@@ -0,0 +1,637 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.index;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
+import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.util.EnvironmentEdge;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Properties;
+
+import static org.apache.phoenix.hbase.index.IndexRegionObserver.UNVERIFIED_BYTES;
+import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
+import static org.apache.phoenix.query.QueryConstants.EMPTY_COLUMN_BYTES;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.when;
+
+public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest {
+
+ private static final int INDEX_TABLE_EXPIRY_SEC = 1;
+ private static final String UNEXPECTED_COLUMN = "0:UNEXPECTED_COLUMN";
+ public static final String FIRST_ID = "FIRST_ID";
+ public static final String SECOND_ID = "SECOND_ID";
+ public static final String FIRST_VALUE = "FIRST_VALUE";
+ public static final String SECOND_VALUE = "SECOND_VALUE";
+ public static final String
+ CREATE_TABLE_DDL = "CREATE TABLE IF NOT EXISTS %s (FIRST_ID BIGINT NOT NULL, "
+ + "SECOND_ID BIGINT NOT NULL, FIRST_VALUE VARCHAR(20), "
+ + "SECOND_VALUE INTEGER "
+ + "CONSTRAINT PK PRIMARY KEY(FIRST_ID, SECOND_ID)) COLUMN_ENCODED_BYTES=0";
+
+ public static final String
+ CREATE_INDEX_DDL = "CREATE INDEX %s ON %s (SECOND_VALUE) INCLUDE (FIRST_VALUE)";
+ public static final String COMPLETE_ROW_UPSERT = "UPSERT INTO %s VALUES (?,?,?,?)";
+ public static final String PARTIAL_ROW_UPSERT = "UPSERT INTO %s (%s, %s, %s) VALUES (?,?,?)";
+ public static final String DELETE_ROW_DML = "DELETE FROM %s WHERE %s = ? AND %s = ?";
+ public static final String INCLUDED_COLUMN = "0:FIRST_VALUE";
+
+ @Rule
+ public ExpectedException exceptionRule = ExpectedException.none();
+
+ private enum TestType {
+ //set of mutations matching expected mutations
+ VALID_EXACT_MATCH,
+ //mix of delete and put mutations
+ VALID_MIX_MUTATIONS,
+ //only incoming unverified mutations
+ VALID_NEW_UNVERIFIED_MUTATIONS,
+ //extra mutations mimicking incoming mutations
+ VALID_MORE_MUTATIONS,
+ EXPIRED,
+ INVALID_EXTRA_CELL,
+ INVALID_EMPTY_CELL,
+ INVALID_CELL_VALUE,
+ INVALID_COLUMN
+ }
+
+ public static class UnitTestClock extends EnvironmentEdge {
+ long initialTime;
+ long delta;
+
+ public UnitTestClock(long delta) {
+ initialTime = System.currentTimeMillis() + delta;
+ this.delta = delta;
+ }
+
+ @Override
+ public long currentTime() {
+ return System.currentTimeMillis() + delta;
+ }
+ }
+
+ @Mock
+ Result indexRow;
+ @Mock
+ IndexRebuildRegionScanner rebuildScanner;
+ List<Mutation> actualMutationList;
+ String schema, table, dataTableFullName, index, indexTableFullName;
+ PTable pIndexTable, pDataTable;
+ Put put = null;
+ Delete delete = null;
+ PhoenixConnection pconn;
+ IndexToolVerificationResult.PhaseResult actualPR;
+ public Map<byte[], List<Mutation>> indexKeyToMutationMapLocal;
+ private IndexMaintainer indexMaintainer;
+
+ @Before
+ public void setup() throws SQLException, IOException {
+ MockitoAnnotations.initMocks(this);
+ createDBObject();
+ createMutationsWithUpserts();
+ initializeRebuildScannerAttributes();
+ initializeGlobalMockitoSetup();
+ }
+
+ public void createDBObject() throws SQLException {
+ try(Connection conn = DriverManager.getConnection(getUrl(), new Properties())) {
+ schema = generateUniqueName();
+ table = generateUniqueName();
+ index = generateUniqueName();
+ dataTableFullName = SchemaUtil.getQualifiedTableName(schema, table);
+ indexTableFullName = SchemaUtil.getQualifiedTableName(schema, index);
+
+ conn.createStatement().execute(String.format(CREATE_TABLE_DDL, dataTableFullName));
+ conn.createStatement().execute(String.format(CREATE_INDEX_DDL, index, dataTableFullName));
+ conn.commit();
+
+ pconn = conn.unwrap(PhoenixConnection.class);
+ pIndexTable = pconn.getTable(new PTableKey(pconn.getTenantId(), indexTableFullName));
+ pDataTable = pconn.getTable(new PTableKey(pconn.getTenantId(), dataTableFullName));
+ }
+ }
+
+ private void createMutationsWithUpserts() throws SQLException, IOException {
+ deleteRow(2, 3);
+ upsertPartialRow(2, 3, "abc");
+ upsertCompleteRow(2, 3, "hik", 8);
+ upsertPartialRow(2, 3, 10);
+ upsertPartialRow(2,3,4);
+ deleteRow(2, 3);
+ upsertPartialRow(2,3, "def");
+ upsertCompleteRow(2, 3, null, 20);
+ upsertPartialRow(2,3, "wert");
+ }
+
+ private void deleteRow(int key1, int key2) throws SQLException, IOException {
+ try(Connection conn = DriverManager.getConnection(getUrl(), new Properties())){
+ PreparedStatement ps =
+ conn.prepareStatement(
+ String.format(DELETE_ROW_DML, dataTableFullName, FIRST_ID, SECOND_ID));
+ ps.setInt(1, key1);
+ ps.setInt(2, key2);
+ ps.execute();
+ convertUpsertToMutations(conn);
+ }
+ }
+
+ private void upsertPartialRow(int key1, int key2, String val1)
+ throws SQLException, IOException {
+
+ try(Connection conn = DriverManager.getConnection(getUrl(), new Properties())){
+ PreparedStatement ps =
+ conn.prepareStatement(
+ String.format(PARTIAL_ROW_UPSERT, dataTableFullName, FIRST_ID, SECOND_ID,
+ FIRST_VALUE));
+ ps.setInt(1, key1);
+ ps.setInt(2, key2);
+ ps.setString(3, val1);
+ ps.execute();
+ convertUpsertToMutations(conn);
+ }
+ }
+
+ private void upsertPartialRow(int key1, int key2, int value1)
+ throws SQLException, IOException {
+
+ try(Connection conn = DriverManager.getConnection(getUrl(), new Properties())){
+ PreparedStatement
+ ps =
+ conn.prepareStatement(
+ String.format(PARTIAL_ROW_UPSERT, dataTableFullName, FIRST_ID, SECOND_ID,
+ SECOND_VALUE));
+ ps.setInt(1, key1);
+ ps.setInt(2, key2);
+ ps.setInt(3, value1);
+ ps.execute();
+ convertUpsertToMutations(conn);
+ }
+ }
+
+ private void upsertCompleteRow(int key1, int key2, String val1
+ , int val2) throws SQLException, IOException {
+ try(Connection conn = DriverManager.getConnection(getUrl(), new Properties())) {
+ PreparedStatement
+ ps = conn.prepareStatement(String.format(COMPLETE_ROW_UPSERT, dataTableFullName));
+ ps.setInt(1, key1);
+ ps.setInt(2, key2);
+ ps.setString(3, val1);
+ ps.setInt(4, val2);
+ ps.execute();
+ convertUpsertToMutations(conn);
+ }
+ }
+
+ private void convertUpsertToMutations(Connection conn) throws SQLException, IOException {
+ Iterator<Pair<byte[],List<KeyValue>>>
+ dataTableNameAndMutationKeyValuesIter = PhoenixRuntime.getUncommittedDataIterator(conn);
+ Pair<byte[], List<KeyValue>> elem = dataTableNameAndMutationKeyValuesIter.next();
+ byte[] key = elem.getSecond().get(0).getRow();
+ long mutationTS = EnvironmentEdgeManager.currentTimeMillis();
+
+ for (KeyValue kv : elem.getSecond()) {
+ Cell cell =
+ CellUtil.createCell(kv.getRow(), kv.getFamily(), kv.getQualifier(),
+ mutationTS, kv.getType(), kv.getValue());
+ if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+ if (put == null ) {
+ put = new Put(key);
+ }
+ put.add(cell);
+ } else {
+ if (delete == null) {
+ delete = new Delete(key);
+ }
+ delete.addDeleteMarker(cell);
+ }
+ }
+ }
+
+ private void initializeRebuildScannerAttributes() {
+ when(rebuildScanner.setIndexTableTTL(Matchers.anyInt())).thenCallRealMethod();
+ when(rebuildScanner.setIndexMaintainer(Matchers.<IndexMaintainer>any())).thenCallRealMethod();
+ when(rebuildScanner.setIndexKeyToMutationMap(Matchers.<Map>any())).thenCallRealMethod();
+ rebuildScanner.setIndexTableTTL(HConstants.FOREVER);
+ indexMaintainer = pIndexTable.getIndexMaintainer(pDataTable, pconn);
+ rebuildScanner.setIndexMaintainer(indexMaintainer);
+ }
+
+ private void initializeGlobalMockitoSetup() throws IOException {
+ //setup
+ when(rebuildScanner.getIndexRowKey(put)).thenCallRealMethod();
+ when(rebuildScanner.prepareIndexMutations(put, delete)).thenCallRealMethod();
+ when(rebuildScanner.verifySingleIndexRow(Matchers.<Result>any(),
+ Matchers.<IndexToolVerificationResult.PhaseResult>any())).thenCallRealMethod();
+ doNothing().when(rebuildScanner)
+ .logToIndexToolOutputTable(Matchers.<byte[]>any(),Matchers.<byte[]>any(),
+ Mockito.anyLong(),Mockito.anyLong(), Mockito.anyString(),
+ Matchers.<byte[]>any(), Matchers.<byte[]>any());
+ doNothing().when(rebuildScanner)
+ .logToIndexToolOutputTable(Matchers.<byte[]>any(),Matchers.<byte[]>any(),
+ Mockito.anyLong(),Mockito.anyLong(), Mockito.anyString());
+
+ //populate the local map to use to create actual mutations
+ indexKeyToMutationMapLocal = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ rebuildScanner.setIndexKeyToMutationMap(indexKeyToMutationMapLocal);
+ rebuildScanner.prepareIndexMutations(put, delete);
+
+ //populate map to use in test code
+ Map<byte[], List<Mutation>> indexKeyToMutationMap = Maps.newTreeMap((Bytes.BYTES_COMPARATOR));
+ rebuildScanner.setIndexKeyToMutationMap(indexKeyToMutationMap);
+ rebuildScanner.prepareIndexMutations(put, delete);
+ }
+
+ private byte[] getValidRowKey() {
+ return indexKeyToMutationMapLocal.entrySet().iterator().next().getKey();
+ }
+
+ @Test
+ public void testVerifySingleIndexRow_validIndexRowCount_nonZero() throws IOException {
+ IndexToolVerificationResult.PhaseResult expectedPR = getValidPhaseResult();
+ for (Map.Entry<byte[], List<Mutation>>
+ entry : indexKeyToMutationMapLocal.entrySet()) {
+ initializeLocalMockitoSetup(entry, TestType.VALID_EXACT_MATCH);
+ //test code
+ rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+ assertTrue(actualPR.equals(expectedPR));
+ }
+ }
+
+ @Test
+ public void testVerifySingleIndexRow_validIndexRowCount_moreActual() throws IOException {
+ IndexToolVerificationResult.PhaseResult expectedPR = getValidPhaseResult();
+ for (Map.Entry<byte[], List<Mutation>>
+ entry : indexKeyToMutationMapLocal.entrySet()) {
+ initializeLocalMockitoSetup(entry, TestType.VALID_MORE_MUTATIONS);
+ //test code
+ rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+ assertTrue(actualPR.equals(expectedPR));
+ }
+ }
+
+ @Test
+ public void testVerifySingleIndexRow_allMix() throws IOException {
+ IndexToolVerificationResult.PhaseResult expectedPR = getValidPhaseResult();
+ for (Map.Entry<byte[], List<Mutation>>
+ entry : indexKeyToMutationMapLocal.entrySet()) {
+ initializeLocalMockitoSetup(entry, TestType.VALID_MIX_MUTATIONS);
+ //test code
+ rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+ assertTrue(actualPR.equals(expectedPR));
+ }
+ }
+
+ @Test
+ public void testVerifySingleIndexRow_allUnverified() throws IOException {
+ IndexToolVerificationResult.PhaseResult expectedPR = getValidPhaseResult();
+ for (Map.Entry<byte[], List<Mutation>>
+ entry : indexKeyToMutationMapLocal.entrySet()) {
+ initializeLocalMockitoSetup(entry, TestType.VALID_NEW_UNVERIFIED_MUTATIONS);
+ //test code
+ rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+ assertTrue(actualPR.equals(expectedPR));
+ }
+ }
+
+ @Test
+ public void testVerifySingleIndexRow_expiredIndexRowCount_nonZero() throws IOException {
+ IndexToolVerificationResult.PhaseResult
+ expectedPR = new IndexToolVerificationResult.PhaseResult(0, 1, 0, 0);
+ for (Map.Entry<byte[], List<Mutation>>
+ entry : indexKeyToMutationMapLocal.entrySet()) {
+ initializeLocalMockitoSetup(entry, TestType.EXPIRED);
+ expireThisRow();
+ //test code
+ rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+ assertTrue(actualPR.equals(expectedPR));
+ }
+ }
+
+ @Test
+ public void testVerifySingleIndexRow_invalidIndexRowCount_cellValue() throws IOException {
+ IndexToolVerificationResult.PhaseResult expectedPR = getInvalidPhaseResult();
+ for (Map.Entry<byte[], List<Mutation>>
+ entry : indexKeyToMutationMapLocal.entrySet()) {
+ initializeLocalMockitoSetup(entry, TestType.INVALID_CELL_VALUE);
+ //test code
+ rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+ assertTrue(actualPR.equals(expectedPR));
+ }
+ }
+
+ @Test
+ public void testVerifySingleIndexRow_invalidIndexRowCount_emptyCell() throws IOException {
+ IndexToolVerificationResult.PhaseResult expectedPR = getInvalidPhaseResult();
+ for (Map.Entry<byte[], List<Mutation>>
+ entry : indexKeyToMutationMapLocal.entrySet()) {
+ initializeLocalMockitoSetup(entry, TestType.INVALID_EMPTY_CELL);
+ //test code
+ rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+ assertTrue(actualPR.equals(expectedPR));
+ }
+ }
+
+ @Test
+ public void testVerifySingleIndexRow_invalidIndexRowCount_diffColumn() throws IOException {
+ IndexToolVerificationResult.PhaseResult expectedPR = getInvalidPhaseResult();
+ for (Map.Entry<byte[], List<Mutation>>
+ entry : indexKeyToMutationMapLocal.entrySet()) {
+ initializeLocalMockitoSetup(entry, TestType.INVALID_COLUMN);
+ //test code
+ rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+ assertTrue(actualPR.equals(expectedPR));
+ }
+ }
+
+ @Test
+ public void testVerifySingleIndexRow_invalidIndexRowCount_extraCell() throws IOException {
+ IndexToolVerificationResult.PhaseResult expectedPR = getInvalidPhaseResult();
+ for (Map.Entry<byte[], List<Mutation>>
+ entry : indexKeyToMutationMapLocal.entrySet()) {
+ initializeLocalMockitoSetup(entry, TestType.INVALID_EXTRA_CELL);
+ //test code
+ rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+
+ assertTrue(actualPR.equals(expectedPR));
+ }
+ }
+
+ @Test
+ public void testVerifySingleIndexRow_expectedMutations_null() throws IOException {
+ when(indexRow.getRow()).thenReturn(Bytes.toBytes(1));
+ exceptionRule.expect(DoNotRetryIOException.class);
+ exceptionRule.expectMessage(IndexRebuildRegionScanner.NO_EXPECTED_MUTATION);
+ rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+ }
+
+ @Test
+ public void testVerifySingleIndexRow_actualMutations_null() throws IOException {
+ byte [] validRowKey = getValidRowKey();
+ when(indexRow.getRow()).thenReturn(validRowKey);
+ when(rebuildScanner.prepareActualIndexMutations(indexRow)).thenReturn(null);
+ exceptionRule.expect(DoNotRetryIOException.class);
+ exceptionRule.expectMessage(IndexRebuildRegionScanner.ACTUAL_MUTATION_IS_NULL_OR_EMPTY);
+ rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+ }
+
+ @Test
+ public void testVerifySingleIndexRow_actualMutations_empty() throws IOException {
+ byte [] validRowKey = getValidRowKey();
+ when(indexRow.getRow()).thenReturn(validRowKey);
+ actualMutationList = new ArrayList<>();
+ when(rebuildScanner.prepareActualIndexMutations(indexRow)).thenReturn(actualMutationList);
+ exceptionRule.expect(DoNotRetryIOException.class);
+ exceptionRule.expectMessage(IndexRebuildRegionScanner.ACTUAL_MUTATION_IS_NULL_OR_EMPTY);
+ rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+ }
+
+ private IndexToolVerificationResult.PhaseResult getValidPhaseResult() {
+ return new IndexToolVerificationResult.PhaseResult(1,0,0,0);
+ }
+
+ private IndexToolVerificationResult.PhaseResult getInvalidPhaseResult() {
+ return new IndexToolVerificationResult.PhaseResult(0, 0, 0, 1);
+ }
+
+ private void initializeLocalMockitoSetup(Map.Entry<byte[], List<Mutation>> entry,
+ TestType testType)
+ throws IOException {
+ actualPR = new IndexToolVerificationResult.PhaseResult();
+ byte[] indexKey = entry.getKey();
+ when(indexRow.getRow()).thenReturn(indexKey);
+ actualMutationList = buildActualIndexMutationsList(testType);
+ when(rebuildScanner.prepareActualIndexMutations(indexRow)).thenReturn(actualMutationList);
+ }
+
+ private List<Mutation> buildActualIndexMutationsList(TestType testType) {
+ List<Mutation> actualMutations = new ArrayList<>();
+ actualMutations.addAll(indexKeyToMutationMapLocal.get(indexRow.getRow()));
+ if(testType.equals(TestType.EXPIRED)) {
+ return actualMutations;
+ }
+ if(testType.toString().startsWith("VALID")) {
+ return getValidActualMutations(testType, actualMutations);
+ }
+ if(testType.toString().startsWith("INVALID")) {
+ return getInvalidActualMutations(testType, actualMutations);
+ }
+ return null;
+ }
+
+ private List <Mutation> getValidActualMutations(TestType testType,
+ List<Mutation> actualMutations) {
+ List <Mutation> newActualMutations = new ArrayList<>();
+ if(testType.equals(TestType.VALID_EXACT_MATCH)) {
+ return actualMutations;
+ }
+ if (testType.equals(TestType.VALID_MIX_MUTATIONS)) {
+ newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null));
+ newActualMutations.add(getDeleteMutation(actualMutations.get(0), new Long(1)));
+ newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null));
+ }
+ if (testType.equals(TestType.VALID_NEW_UNVERIFIED_MUTATIONS)) {
+ newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null));
+ newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null));
+ newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null));
+ newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), new Long(1)));
+ }
+ newActualMutations.addAll(actualMutations);
+ if(testType.equals(TestType.VALID_MORE_MUTATIONS)) {
+ newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), null));
+ newActualMutations.add(getDeleteMutation(actualMutations.get(0), null));
+ newActualMutations.add(getDeleteMutation(actualMutations.get(0), new Long(1)));
+ newActualMutations.add(getUnverifiedPutMutation(actualMutations.get(0), new Long(1)));
+ }
+ return newActualMutations;
+ }
+
+ private List <Mutation> getInvalidActualMutations(TestType testType,
+ List<Mutation> actualMutations) {
+ List <Mutation> newActualMutations = new ArrayList<>();
+ newActualMutations.addAll(actualMutations);
+ for (Mutation m : actualMutations) {
+ newActualMutations.remove(m);
+ NavigableMap<byte[], List<Cell>> familyCellMap = m.getFamilyCellMap();
+ List<Cell> cellList = familyCellMap.firstEntry().getValue();
+ List<Cell> newCellList = new ArrayList<>();
+ byte[] fam = CellUtil.cloneFamily(cellList.get(0));
+ for (Cell c : cellList) {
+ infiltrateCell(c, newCellList, testType);
+ }
+ familyCellMap.put(fam, newCellList);
+ m.setFamilyCellMap(familyCellMap);
+ newActualMutations.add(m);
+ }
+ return newActualMutations;
+ }
+
+ private void infiltrateCell(Cell c, List<Cell> newCellList, TestType e) {
+ Cell newCell;
+ Cell emptyCell;
+ switch(e) {
+ case INVALID_COLUMN:
+ newCell =
+ CellUtil.createCell(CellUtil.cloneRow(c), CellUtil.cloneFamily(c),
+ Bytes.toBytes(UNEXPECTED_COLUMN),
+ EnvironmentEdgeManager.currentTimeMillis(),
+ KeyValue.Type.Put.getCode(), Bytes.toBytes("zxcv"));
+ newCellList.add(newCell);
+ newCellList.add(c);
+ break;
+ case INVALID_CELL_VALUE:
+ if (CellUtil.matchingQualifier(c, EMPTY_COLUMN_BYTES)) {
+ newCell = getCellWithPut(c);
+ emptyCell = getUnverifiedEmptyCell(c);
+ newCellList.add(newCell);
+ newCellList.add(emptyCell);
+ } else {
+ newCellList.add(c);
+ }
+ break;
+ case INVALID_EMPTY_CELL:
+ if (CellUtil.matchingQualifier(c, EMPTY_COLUMN_BYTES)) {
+ newCell =
+ CellUtil.createCell(CellUtil.cloneRow(c), CellUtil.cloneFamily(c),
+ CellUtil.cloneQualifier(c), c.getTimestamp(),
+ KeyValue.Type.Delete.getCode(), VERIFIED_BYTES);
+ newCellList.add(newCell);
+ } else {
+ newCellList.add(c);
+ }
+ break;
+ case INVALID_EXTRA_CELL:
+ newCell = getCellWithPut(c);
+ emptyCell = getUnverifiedEmptyCell(c);
+ newCellList.add(newCell);
+ newCellList.add(emptyCell);
+ newCellList.add(c);
+ }
+ }
+
+ private Cell getUnverifiedEmptyCell(Cell c) {
+ return CellUtil.createCell(CellUtil.cloneRow(c), CellUtil.cloneFamily(c),
+ indexMaintainer.getEmptyKeyValueQualifier(),
+ EnvironmentEdgeManager.currentTimeMillis(),
+ KeyValue.Type.Put.getCode(), UNVERIFIED_BYTES);
+ }
+
+ private Cell getCellWithPut(Cell c) {
+ return CellUtil.createCell(CellUtil.cloneRow(c),
+ CellUtil.cloneFamily(c), Bytes.toBytes(INCLUDED_COLUMN),
+ EnvironmentEdgeManager.currentTimeMillis(), KeyValue.Type.Put.getCode(),
+ Bytes.toBytes("zxcv"));
+ }
+
+ private void expireThisRow() {
+ rebuildScanner.setIndexTableTTL(INDEX_TABLE_EXPIRY_SEC);
+ UnitTestClock expiryClock = new UnitTestClock(5000);
+ EnvironmentEdgeManager.injectEdge(expiryClock);
+ }
+
+ private Mutation getDeleteMutation(Mutation orig, Long ts) {
+ Mutation m = new Delete(orig.getRow());
+ List<Cell> origList = orig.getFamilyCellMap().firstEntry().getValue();
+ ts = ts == null ? EnvironmentEdgeManager.currentTimeMillis() : ts;
+ Cell c = getNewPutCell(orig, origList, ts, KeyValue.Type.DeleteFamilyVersion);
+ Cell empty = getEmptyCell(orig, origList, ts, KeyValue.Type.Put, true);
+ byte[] fam = CellUtil.cloneFamily(origList.get(0));
+ List<Cell> famCells = Lists.newArrayList();
+ m.getFamilyCellMap().put(fam, famCells);
+ famCells.add(c);
+ famCells.add(empty);
+ return m;
+ }
+
+ private Mutation getUnverifiedPutMutation(Mutation orig, Long ts) {
+ Mutation m = new Put(orig.getRow());
+ if (orig.getAttributesMap() != null) {
+ for (Map.Entry<String,byte[]> entry : orig.getAttributesMap().entrySet()) {
+ m.setAttribute(entry.getKey(), entry.getValue());
+ }
+ }
+ List<Cell> origList = orig.getFamilyCellMap().firstEntry().getValue();
+ ts = ts == null ? EnvironmentEdgeManager.currentTimeMillis() : ts;
+ Cell c = getNewPutCell(orig, origList, ts, KeyValue.Type.Put);
+ Cell empty = getEmptyCell(orig, origList, ts, KeyValue.Type.Put, false);
+ byte[] fam = CellUtil.cloneFamily(origList.get(0));
+ List<Cell> famCells = Lists.newArrayList();
+ m.getFamilyCellMap().put(fam, famCells);
+ famCells.add(c);
+ famCells.add(empty);
+ return m;
+ }
+
+ private Cell getEmptyCell(Mutation orig, List<Cell> origList, Long ts, KeyValue.Type type,
+ boolean verified) {
+ return CellUtil.createCell(orig.getRow(), CellUtil.cloneFamily(origList.get(0)),
+ indexMaintainer.getEmptyKeyValueQualifier(),
+ ts, type.getCode(), verified ? VERIFIED_BYTES : UNVERIFIED_BYTES);
+ }
+
+ private Cell getNewPutCell(Mutation orig, List<Cell> origList, Long ts, KeyValue.Type type) {
+ return CellUtil.createCell(orig.getRow(),
+ CellUtil.cloneFamily(origList.get(0)), Bytes.toBytes(INCLUDED_COLUMN),
+ ts, type.getCode(), Bytes.toBytes("asdfg"));
+ }
+}