You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2020/04/07 03:52:18 UTC
[phoenix] branch 4.x updated: PHOENIX-5735 IndexTool's inline
verification should not verify rows beyond max lookback age
This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new 339a22d PHOENIX-5735 IndexTool's inline verification should not verify rows beyond max lookback age
339a22d is described below
commit 339a22d29b3c5f268935c8c013aaa1e1f4bbbf69
Author: Weiming Wang <wa...@live.cn>
AuthorDate: Wed Apr 1 11:09:09 2020 -0700
PHOENIX-5735 IndexTool's inline verification should not verify rows beyond max lookback age
Signed-off-by: Kadir <ko...@salesforce.com>
---
.../org/apache/phoenix/end2end/IndexToolIT.java | 51 ++++--
.../hadoop/hbase/regionserver/ScanInfoUtil.java | 2 +-
.../coprocessor/IndexRebuildRegionScanner.java | 173 +++++++++++++++------
.../coprocessor/IndexToolVerificationResult.java | 78 ++++++++--
.../apache/phoenix/mapreduce/index/IndexTool.java | 8 +
.../index/PhoenixIndexImportDirectReducer.java | 8 +
.../index/PhoenixIndexToolJobCounters.java | 6 +-
.../phoenix/index/VerifySingleIndexRowTest.java | 168 +++++++++++++++-----
8 files changed, 376 insertions(+), 118 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 119c806..71cb530 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -62,6 +62,7 @@ import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -81,19 +82,21 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.phoenix.mapreduce.PhoenixJobCounters.INPUT_RECORDS;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_VALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
@@ -251,6 +254,8 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
dropIndexToolTables(conn);
}
}
@@ -356,6 +361,8 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
} finally {
conn.close();
}
@@ -412,7 +419,9 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
- assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
assertEquals(NROWS, actualRowCount);
@@ -429,10 +438,14 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
assertEquals(2 * NROWS, actualRowCount);
dropIndexToolTables(conn);
@@ -593,9 +606,13 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
null, -1, IndexTool.IndexVerifyType.AFTER);
// The index tool output table should report that there is a missing index row
Cell cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, "_IDX_" + dataTableFullName);
- byte[] expectedValueBytes = Bytes.toBytes("Missing index row");
- assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
- expectedValueBytes, 0, expectedValueBytes.length) == 0);
+ try {
+ String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK;
+ String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+ assertTrue(expectedErrorMsg.equals(actualErrorMsg));
+ } catch(Exception ex){
+ Assert.fail("Fail to parsing the error message from IndexToolOutputTable");
+ }
IndexRegionObserver.setIgnoreIndexRebuildForTesting(false);
dropIndexToolTables(conn);
}
@@ -627,9 +644,13 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
null, -1, IndexTool.IndexVerifyType.ONLY);
Cell cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName);
- byte[] expectedValueBytes = Bytes.toBytes("Missing index row");
- assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
- expectedValueBytes, 0, expectedValueBytes.length) == 0);
+ try {
+ String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK;
+ String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+ assertTrue(expectedErrorMsg.equals(actualErrorMsg));
+ } catch(Exception ex) {
+ Assert.fail("Fail to parsing the error message from IndexToolOutputTable");
+ }
// Delete the output table for the next test
dropIndexToolTables(conn);
// Run the index tool to populate the index while verifying rows
@@ -668,9 +689,13 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
Cell cell =
getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName,
indexTableFullName);
- byte[] expectedValueBytes = Bytes.toBytes("Missing index row");
- assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(),
- cell.getValueLength(), expectedValueBytes, 0, expectedValueBytes.length) == 0);
+ try {
+ String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK;
+ String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+ assertTrue(expectedErrorMsg.equals(actualErrorMsg));
+ } catch(Exception ex) {
+ Assert.fail("Fail to parsing the error message from IndexToolOutputTable");
+ }
// Run the index tool to populate the index while verifying rows
runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0,
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java
index e70ffc7..7d54228 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfoUtil.java
@@ -121,7 +121,7 @@ public class ScanInfoUtil {
DEFAULT_PHOENIX_MAX_LOOKBACK_AGE));
}
- private static boolean isMaxLookbackTimeEnabled(long maxLookbackTime){
+ public static boolean isMaxLookbackTimeEnabled(long maxLookbackTime){
return maxLookbackTime > 0L;
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 0bd8c7c..2bdaf1d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -17,21 +17,10 @@
*/
package org.apache.phoenix.coprocessor;
-import static org.apache.phoenix.hbase.index.IndexRegionObserver.UNVERIFIED_BYTES;
import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
import static org.apache.phoenix.hbase.index.IndexRegionObserver.removeEmptyColumn;
import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
-import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.REBUILT_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.RESULT_TABLE_COLUMN_FAMILY;
-import static org.apache.phoenix.mapreduce.index.IndexTool.SCANNED_DATA_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.*;
import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
@@ -48,7 +37,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
-import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import com.google.common.annotations.VisibleForTesting;
@@ -72,8 +60,9 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.cache.ServerCacheClient;
import org.apache.phoenix.compile.ScanRanges;
@@ -109,6 +98,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
public class IndexRebuildRegionScanner extends BaseRegionScanner {
+ public static final String ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK = "Missing index row beyond maxLookBack";
private static final Logger LOGGER = LoggerFactory.getLogger(IndexRebuildRegionScanner.class);
public static final String NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY = "index.verify.threads.max";
@@ -154,6 +144,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
private int singleRowRebuildReturnCode;
private Map<byte[], NavigableSet<byte[]>> familyMap;
private byte[][] viewConstants;
+ private long maxLookBackInMills;
@VisibleForTesting
public IndexRebuildRegionScanner(final RegionScanner innerScanner, final Region region, final Scan scan,
@@ -219,6 +210,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK);
}
}
+
+ maxLookBackInMills = ScanInfoUtil.getMaxLookbackInMillis(config);
}
private void setReturnCodeForSingleRowRebuild() throws IOException {
@@ -302,6 +295,10 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.missingIndexRowCount)));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES,
scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.invalidIndexRowCount)));
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES,
+ scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.beyondMaxLookBackMissingIndexRowCount)));
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES,
+ scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.before.beyondMaxLookBackInvalidIndexRowCount)));
}
if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) {
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES,
@@ -312,6 +309,10 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.missingIndexRowCount)));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES,
scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.invalidIndexRowCount)));
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES,
+ scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.beyondMaxLookBackMissingIndexRowCount)));
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES,
+ scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.after.beyondMaxLookBackInvalidIndexRowCount)));
}
resultHTable.put(put);
}
@@ -370,6 +371,12 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
return 0;
}
+ @VisibleForTesting
+ public long setMaxLookBackInMills(long maxLookBackInMills) {
+ this.maxLookBackInMills = maxLookBackInMills;
+ return 0;
+ }
+
public static class SimpleValueGetter implements ValueGetter {
final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
final Put put;
@@ -516,13 +523,13 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
return null;
}
- private boolean isMatchingMutation(Mutation expected, Mutation actual, int iteration) throws IOException {
+ private void logMismatch(Mutation expected, Mutation actual, int iteration) throws IOException {
if (getTimestamp(expected) != getTimestamp(actual)) {
String errorMsg = "Not matching timestamp";
byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
errorMsg, null, null);
- return false;
+ return;
}
int expectedCellCount = 0;
for (List<Cell> cells : expected.getFamilyCellMap().values()) {
@@ -539,14 +546,14 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
String errorMsg = "Missing cell (in iteration " + iteration + ") " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);
logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual), errorMsg);
- return false;
+ return;
}
if (!CellUtil.matchingValue(actualCell, expectedCell)) {
String errorMsg = "Not matching value (in iteration " + iteration + ") for " + Bytes.toString(family) + ":" + Bytes.toString(qualifier);
byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
errorMsg, CellUtil.cloneValue(expectedCell), CellUtil.cloneValue(actualCell));
- return false;
+ return;
}
}
}
@@ -562,6 +569,40 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(expected.getRow()), viewConstants);
logToIndexToolOutputTable(dataKey, expected.getRow(), getTimestamp(expected), getTimestamp(actual),
errorMsg);
+ }
+ }
+
+ private boolean isMatchingMutation(Mutation expected, Mutation actual) throws IOException {
+ if (getTimestamp(expected) != getTimestamp(actual)) {
+ return false;
+ }
+ int expectedCellCount = 0;
+ for (List<Cell> cells : expected.getFamilyCellMap().values()) {
+ if (cells == null) {
+ continue;
+ }
+ for (Cell expectedCell : cells) {
+ expectedCellCount++;
+ byte[] family = CellUtil.cloneFamily(expectedCell);
+ byte[] qualifier = CellUtil.cloneQualifier(expectedCell);
+ Cell actualCell = getCell(actual, family, qualifier);
+ if (actualCell == null ||
+ !CellUtil.matchingType(expectedCell, actualCell)) {
+ return false;
+ }
+ if (!CellUtil.matchingValue(actualCell, expectedCell)) {
+ return false;
+ }
+ }
+ }
+ int actualCellCount = 0;
+ for (List<Cell> cells : actual.getFamilyCellMap().values()) {
+ if (cells == null) {
+ continue;
+ }
+ actualCellCount += cells.size();
+ }
+ if (expectedCellCount != actualCellCount) {
return false;
}
return true;
@@ -698,7 +739,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
while (iterator.hasNext()) {
Mutation mutation = iterator.next();
if ((mutation instanceof Put && !isVerified((Put) mutation)) ||
- (mutation instanceof Delete && isDeleteFamilyVersion(mutation))) {
+ (mutation instanceof Delete && !isDeleteFamily(mutation))) {
iterator.remove();
} else {
if (previous != null && getTimestamp(previous) == getTimestamp(mutation) &&
@@ -796,15 +837,14 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
repairActualMutationList(actualMutationList, expectedMutationList);
}
cleanUpActualMutationList(actualMutationList);
- long currentTime = EnvironmentEdgeManager.currentTime();
+ long currentTime = EnvironmentEdgeManager.currentTimeMillis();
int actualIndex = 0;
int expectedIndex = 0;
- int matchingCount = 0;
int expectedSize = expectedMutationList.size();
int actualSize = actualMutationList.size();
Mutation expected = null;
Mutation previousExpected;
- Mutation actual;
+ Mutation actual = null;
while (expectedIndex < expectedSize && actualIndex <actualSize) {
previousExpected = expected;
expected = expectedMutationList.get(expectedIndex);
@@ -822,7 +862,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
if (previousExpected instanceof Delete) {
// Between an expected delete and put, there can be one or more deletes due to
// concurrent mutations or data table write failures. Skip all of them if any
- while (getTimestamp(actual) > getTimestamp(expected) && (actual instanceof Delete)) {
+ // There cannot be any actual delete mutation between two expected put mutations.
+ while (getTimestamp(actual) >= getTimestamp(expected) && actual instanceof Delete) {
actualIndex++;
if (actualIndex == actualSize) {
break;
@@ -836,10 +877,9 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
if (actual instanceof Delete) {
break;
}
- if (isMatchingMutation(expected, actual, expectedIndex)) {
+ if (isMatchingMutation(expected, actual)) {
expectedIndex++;
actualIndex++;
- matchingCount++;
continue;
}
} else { // expected instanceof Delete
@@ -859,39 +899,54 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
(actual instanceof Delete && isDeleteFamily(actual))) {
expectedIndex++;
actualIndex++;
- matchingCount++;
continue;
}
}
- if (matchingCount > 0) {
- break;
+ break;
+ }
+
+ if (expectedIndex == expectedSize ){
+ // every expected mutation has its matching one in the actual list.
+ verificationPhaseResult.validIndexRowCount++;
+ return true;
+ }
+
+ if (isTimestampBeyondMaxLookBack(currentTime, getTimestamp(expectedMutationList.get(expectedIndex)))){
+ if (expectedIndex > 0) {
+ // if current expected index mutation is beyond max look back window, we only need to make sure its latest
+ // mutation is a matching one, as an SCN query is required.
+ verificationPhaseResult.validIndexRowCount++;
+ return true;
}
- verificationPhaseResult.invalidIndexRowCount++;
+
+ // All expected mutations are beyond the maxLookBack window, none of them can find its matching one in actual list
+ // It may be caused by real bug or compaction on the index table.
+ // We report it as a failure, so "before" option can trigger the index rebuild for this row.
+ // This repair is required, when there is only one index row for a given data table row and the timestamp of that row
+ // can be beyond maxLookBack.
+ verificationPhaseResult.beyondMaxLookBackInvalidIndexRowCount++;
+ byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
+ String errorMsg = String.format("Expect %1$s mutations but got %2$s (beyond maxLookBack)",
+ expectedSize,
+ actualSize);
+ logToIndexToolOutputTable(dataKey, indexRow.getRow(),
+ getTimestamp(expectedMutationList.get(expectedIndex)),
+ 0, errorMsg);
return false;
}
- if ((expectedIndex != expectedSize) || actualIndex != actualSize) {
- if (matchingCount > 0) {
- if (verifyType != IndexTool.IndexVerifyType.ONLY) {
- // We do not consider this as a verification issue but log it for further information.
- // This may happen due to compaction
- byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
- String errorMsg = "Expected to find " + expectedMutationList.size() + " mutations but got "
- + actualMutationList.size();
- logToIndexToolOutputTable(dataKey, indexRow.getRow(),
- getTimestamp(expectedMutationList.get(0)),
- getTimestamp(actualMutationList.get(0)), errorMsg);
- }
- } else {
+ else {
+ if (actualIndex < actualSize && actual instanceof Put && expected instanceof Put){
+ logMismatch(expected, actual, expectedIndex);
+ }
+ else {
byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRow.getRow()), viewConstants);
String errorMsg = "Not matching index row";
logToIndexToolOutputTable(dataKey, indexRow.getRow(),
getTimestamp(expectedMutationList.get(0)), 0L, errorMsg);
- verificationPhaseResult.invalidIndexRowCount++;
- return false;
}
+ verificationPhaseResult.invalidIndexRowCount++;
+ return false;
}
- verificationPhaseResult.validIndexRowCount++;
- return true;
}
private static long getMaxTimestamp(Pair<Put, Delete> pair) {
@@ -937,7 +992,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
// TODO: metrics for expired rows
if (!keys.isEmpty()) {
Iterator<KeyRange> itr = keys.iterator();
- long currentTime = EnvironmentEdgeManager.currentTime();
+ long currentTime = EnvironmentEdgeManager.currentTimeMillis();
while(itr.hasNext()) {
KeyRange keyRange = itr.next();
byte[] key = keyRange.getLowerRange();
@@ -950,20 +1005,30 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
}
if (keys.size() > 0) {
for (KeyRange keyRange : keys) {
- String errorMsg = "Missing index row";
byte[] key = keyRange.getLowerRange();
List<Mutation> mutationList = indexKeyToMutationMap.get(key);
- if (mutationList.get(mutationList.size() - 1) instanceof Delete) {
+ Mutation mutation = mutationList.get(mutationList.size() - 1);
+ if (mutation instanceof Delete) {
continue;
}
+ long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+ String errorMsg;
+ if (isTimestampBeyondMaxLookBack(currentTime, getTimestamp(mutation))){
+ errorMsg = ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK;
+ verificationPhaseResult.beyondMaxLookBackMissingIndexRowCount++;
+ }
+ else {
+ errorMsg = "Missing index row";
+ verificationPhaseResult.missingIndexRowCount++;
+ }
byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(keyRange.getLowerRange()), viewConstants);
logToIndexToolOutputTable(dataKey,
keyRange.getLowerRange(),
getMaxTimestamp(dataKeyToMutationMap.get(dataKey)),
- getTimestamp(mutationList.get(mutationList.size() - 1)), errorMsg);
- verificationPhaseResult.missingIndexRowCount++;
+ getTimestamp(mutation), errorMsg);
}
}
+
keys.addAll(invalidKeys);
}
@@ -974,6 +1039,12 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
return tsToCheck < (currentTime - (long) indexTableTTL * 1000);
}
+ private boolean isTimestampBeyondMaxLookBack(long currentTime, long tsToCheck){
+ if (!ScanInfoUtil.isMaxLookbackTimeEnabled(maxLookBackInMills))
+ return true;
+ return tsToCheck < (currentTime - maxLookBackInMills);
+ }
+
private void addVerifyTask(final List<KeyRange> keys,
final IndexToolVerificationResult.PhaseResult verificationPhaseResult) {
tasks.add(new Task<Boolean>() {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java
index ed92fad..1fbb866 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java
@@ -30,17 +30,7 @@ import org.apache.phoenix.mapreduce.index.IndexTool;
import java.io.IOException;
import java.util.Arrays;
-import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.REBUILT_INDEX_ROW_COUNT_BYTES;
-import static org.apache.phoenix.mapreduce.index.IndexTool.RESULT_TABLE_COLUMN_FAMILY;
-import static org.apache.phoenix.mapreduce.index.IndexTool.SCANNED_DATA_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexTool.*;
public class IndexToolVerificationResult {
public static class PhaseResult {
@@ -48,26 +38,33 @@ public class IndexToolVerificationResult {
long expiredIndexRowCount = 0;
long missingIndexRowCount = 0;
long invalidIndexRowCount = 0;
+ long beyondMaxLookBackMissingIndexRowCount = 0;
+ long beyondMaxLookBackInvalidIndexRowCount = 0;
public void add(PhaseResult phaseResult) {
validIndexRowCount += phaseResult.validIndexRowCount;
expiredIndexRowCount += phaseResult.expiredIndexRowCount;
missingIndexRowCount += phaseResult.missingIndexRowCount;
invalidIndexRowCount += phaseResult.invalidIndexRowCount;
+ beyondMaxLookBackMissingIndexRowCount += phaseResult.beyondMaxLookBackMissingIndexRowCount;
+ beyondMaxLookBackInvalidIndexRowCount += phaseResult.beyondMaxLookBackInvalidIndexRowCount;
}
public PhaseResult(){}
public PhaseResult(long validIndexRowCount, long expiredIndexRowCount,
- long missingIndexRowCount, long invalidIndexRowCount) {
+ long missingIndexRowCount, long invalidIndexRowCount,
+ long beyondMaxLookBackMissingIndexRowCount, long beyondMaxLookBackInvalidIndexRowCount) {
this.validIndexRowCount = validIndexRowCount;
this.expiredIndexRowCount = expiredIndexRowCount;
this.missingIndexRowCount = missingIndexRowCount;
this.invalidIndexRowCount = invalidIndexRowCount;
+ this.beyondMaxLookBackInvalidIndexRowCount = beyondMaxLookBackInvalidIndexRowCount;
+ this.beyondMaxLookBackMissingIndexRowCount = beyondMaxLookBackMissingIndexRowCount;
}
public long getTotalCount() {
- return validIndexRowCount + expiredIndexRowCount + missingIndexRowCount + invalidIndexRowCount;
+ return validIndexRowCount + expiredIndexRowCount + missingIndexRowCount + invalidIndexRowCount + beyondMaxLookBackMissingIndexRowCount + beyondMaxLookBackInvalidIndexRowCount;
}
@Override
@@ -77,7 +74,8 @@ public class IndexToolVerificationResult {
", expiredIndexRowCount=" + expiredIndexRowCount +
", missingIndexRowCount=" + missingIndexRowCount +
", invalidIndexRowCount=" + invalidIndexRowCount +
- '}';
+ ", beyondMaxLookBackMissingIndexRowCount=" + beyondMaxLookBackMissingIndexRowCount +
+ ", beyondMaxLookBackInvalidIndexRowCount=" + beyondMaxLookBackInvalidIndexRowCount;
}
@Override
@@ -92,7 +90,9 @@ public class IndexToolVerificationResult {
return this.expiredIndexRowCount == pr.expiredIndexRowCount
&& this.validIndexRowCount == pr.validIndexRowCount
&& this.invalidIndexRowCount == pr.invalidIndexRowCount
- && this.missingIndexRowCount == pr.missingIndexRowCount;
+ && this.missingIndexRowCount == pr.missingIndexRowCount
+ && this.beyondMaxLookBackInvalidIndexRowCount == pr.beyondMaxLookBackInvalidIndexRowCount
+ && this.beyondMaxLookBackMissingIndexRowCount == pr.beyondMaxLookBackMissingIndexRowCount;
}
@Override
@@ -102,6 +102,8 @@ public class IndexToolVerificationResult {
result = 31 * result + validIndexRowCount;
result = 31 * result + missingIndexRowCount;
result = 31 * result + invalidIndexRowCount;
+ result = 31 * result + beyondMaxLookBackMissingIndexRowCount;
+ result = 31 * result + beyondMaxLookBackInvalidIndexRowCount;
return (int)result;
}
}
@@ -141,6 +143,14 @@ public class IndexToolVerificationResult {
return before.invalidIndexRowCount;
}
+ public long getBeforeRebuildBeyondMaxLookBackMissingIndexRowCount() {
+ return before.beyondMaxLookBackMissingIndexRowCount;
+ };
+
+ public long getBeforeRebuildBeyondMaxLookBackInvalidIndexRowCount() {
+ return before.beyondMaxLookBackInvalidIndexRowCount;
+ };
+
public long getBeforeRebuildMissingIndexRowCount() {
return before.missingIndexRowCount;
}
@@ -161,6 +171,14 @@ public class IndexToolVerificationResult {
return after.missingIndexRowCount;
}
+ public long getAfterRebuildBeyondMaxLookBackMissingIndexRowCount() {
+ return after.beyondMaxLookBackMissingIndexRowCount;
+ };
+
+ public long getAfterRebuildBeyondMaxLookBackInvalidIndexRowCount() {
+ return after.beyondMaxLookBackInvalidIndexRowCount;
+ };
+
private void addScannedDataRowCount(long count) {
this.scannedDataRowCount += count;
}
@@ -185,6 +203,14 @@ public class IndexToolVerificationResult {
before.invalidIndexRowCount += count;
}
+ private void addBeforeRebuildBeyondMaxLookBackMissingIndexRowCount(long count) {
+ before.beyondMaxLookBackMissingIndexRowCount += count;
+ }
+
+ private void addBeforeRebuildBeyondMaxLookBackInvalidIndexRowCount(long count) {
+ before.beyondMaxLookBackInvalidIndexRowCount += count;
+ }
+
private void addAfterRebuildValidIndexRowCount(long count) {
after.validIndexRowCount += count;
}
@@ -201,6 +227,14 @@ public class IndexToolVerificationResult {
after.invalidIndexRowCount += count;
}
+ private void addAfterRebuildBeyondMaxLookBackMissingIndexRowCount(long count) {
+ after.beyondMaxLookBackMissingIndexRowCount += count;
+ }
+
+ private void addAfterRebuildBeyondMaxLookBackInvalidIndexRowCount(long count) {
+ after.beyondMaxLookBackInvalidIndexRowCount += count;
+ }
+
private static boolean isAfterRebuildInvalidIndexRowCount(Cell cell) {
if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, 0,
@@ -229,6 +263,10 @@ public class IndexToolVerificationResult {
addBeforeRebuildMissingIndexRowCount(getValue(cell));
} else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) {
addBeforeRebuildInvalidIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES)) {
+ addBeforeRebuildBeyondMaxLookBackMissingIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES)) {
+ addBeforeRebuildBeyondMaxLookBackInvalidIndexRowCount(getValue(cell));
} else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) {
addAfterRebuildValidIndexRowCount(getValue(cell));
} else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) {
@@ -237,6 +275,10 @@ public class IndexToolVerificationResult {
addAfterRebuildMissingIndexRowCount(getValue(cell));
} else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES)) {
addAfterRebuildInvalidIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES)) {
+ addAfterRebuildBeyondMaxLookBackMissingIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES)) {
+ addAfterRebuildBeyondMaxLookBackInvalidIndexRowCount(getValue(cell));
}
}
@@ -284,11 +326,13 @@ public class IndexToolVerificationResult {
if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.NONE) {
return false;
} else if (verifyType == IndexTool.IndexVerifyType.ONLY) {
- if (before.invalidIndexRowCount + before.missingIndexRowCount > 0) {
+ if (before.invalidIndexRowCount + before.missingIndexRowCount
+ + before.beyondMaxLookBackInvalidIndexRowCount + before.beyondMaxLookBackMissingIndexRowCount > 0) {
return true;
}
} else if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) {
- if (after.invalidIndexRowCount + after.missingIndexRowCount > 0) {
+ if (after.invalidIndexRowCount + after.missingIndexRowCount
+ + after.beyondMaxLookBackInvalidIndexRowCount + after.beyondMaxLookBackMissingIndexRowCount > 0) {
return true;
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 0347811..95703a4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -187,6 +187,10 @@ public class IndexTool extends Configured implements Tool {
public final static byte[] BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT);
public static String BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT = "BeforeRebuildInvalidIndexRowCount";
public final static byte[] BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT);
+ public static String BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT = "BeforeRebuildBeyondMaxLookBackMissingIndexRowCount";
+ public static byte[] BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT);
+ public static String BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT = "BeforeRebuildBeyondMaxLookBackInvalidIndexRowCount";
+ public static byte[] BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT);
public static String AFTER_REBUILD_VALID_INDEX_ROW_COUNT = "AfterValidExpiredIndexRowCount";
public final static byte[] AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_VALID_INDEX_ROW_COUNT);
public static String AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT = "AfterRebuildExpiredIndexRowCount";
@@ -195,6 +199,10 @@ public class IndexTool extends Configured implements Tool {
public final static byte[] AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT);
public static String AFTER_REBUILD_INVALID_INDEX_ROW_COUNT = "AfterRebuildInvalidIndexRowCount";
public final static byte[] AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT);
+ public static String AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT = "AfterRebuildBeyondMaxLookBackMissingIndexRowCount";
+ public static byte[] AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT);
+ public static String AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT = "AfterRebuildBeyondMaxLookBackInvalidIndexRowCount";
+ public static byte[] AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT);
public static String VERIFICATION_PHASE = "Phase";
public final static byte[] VERIFICATION_PHASE_BYTES = Bytes.toBytes(VERIFICATION_PHASE);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
index 8d1b4db..a24e3ab 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
@@ -80,6 +80,10 @@ public class PhoenixIndexImportDirectReducer extends
setValue(verificationResult.getBeforeRebuildMissingIndexRowCount());
context.getCounter(PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).
setValue(verificationResult.getBeforeRebuildInvalidIndexRowCount());
+ context.getCounter(PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).
+ setValue(verificationResult.getBeforeRebuildBeyondMaxLookBackMissingIndexRowCount());
+ context.getCounter(PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).
+ setValue(verificationResult.getBeforeRebuildBeyondMaxLookBackInvalidIndexRowCount());
}
if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) {
context.getCounter(PhoenixIndexToolJobCounters.AFTER_REBUILD_VALID_INDEX_ROW_COUNT).
@@ -90,6 +94,10 @@ public class PhoenixIndexImportDirectReducer extends
setValue(verificationResult.getAfterRebuildMissingIndexRowCount());
context.getCounter(PhoenixIndexToolJobCounters.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT).
setValue(verificationResult.getAfterRebuildInvalidIndexRowCount());
+ context.getCounter(PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).
+ setValue(verificationResult.getAfterRebuildBeyondMaxLookBackMissingIndexRowCount());
+ context.getCounter(PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).
+ setValue(verificationResult.getAfterRebuildBeyondMaxLookBackInvalidIndexRowCount());
}
if (verificationResult.isVerificationFailed(verifyType)) {
throw new IOException("Index verification failed! " + verificationResult);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java
index c10694d..b736787 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java
@@ -28,8 +28,12 @@ public enum PhoenixIndexToolJobCounters {
BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT,
BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT,
BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT,
+ BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT,
+ BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT,
AFTER_REBUILD_VALID_INDEX_ROW_COUNT,
AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT,
AFTER_REBUILD_MISSING_INDEX_ROW_COUNT,
- AFTER_REBUILD_INVALID_INDEX_ROW_COUNT;
+ AFTER_REBUILD_INVALID_INDEX_ROW_COUNT,
+ AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT,
+ AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
index 30961e3..5df041d 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
@@ -32,16 +32,14 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableKey;
-import org.apache.phoenix.util.EnvironmentEdge;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.*;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -55,17 +53,13 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Properties;
+import java.util.*;
import static org.apache.phoenix.hbase.index.IndexRegionObserver.UNVERIFIED_BYTES;
import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
import static org.apache.phoenix.query.QueryConstants.EMPTY_COLUMN_BYTES;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.when;
@@ -265,9 +259,12 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest {
when(rebuildScanner.setIndexTableTTL(Matchers.anyInt())).thenCallRealMethod();
when(rebuildScanner.setIndexMaintainer(Matchers.<IndexMaintainer>any())).thenCallRealMethod();
when(rebuildScanner.setIndexKeyToMutationMap(Matchers.<Map>any())).thenCallRealMethod();
+ when(rebuildScanner.setMaxLookBackInMills(Matchers.anyLong())).thenCallRealMethod();
rebuildScanner.setIndexTableTTL(HConstants.FOREVER);
indexMaintainer = pIndexTable.getIndexMaintainer(pDataTable, pconn);
rebuildScanner.setIndexMaintainer(indexMaintainer);
+ // set the maxLookBack to infinite to avoid the compaction
+ rebuildScanner.setMaxLookBackInMills(Long.MAX_VALUE);
}
private void initializeGlobalMockitoSetup() throws IOException {
@@ -354,7 +351,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest {
@Test
public void testVerifySingleIndexRow_expiredIndexRowCount_nonZero() throws IOException {
IndexToolVerificationResult.PhaseResult
- expectedPR = new IndexToolVerificationResult.PhaseResult(0, 1, 0, 0);
+ expectedPR = new IndexToolVerificationResult.PhaseResult(0, 1, 0, 0, 0, 0);
for (Map.Entry<byte[], List<Mutation>>
entry : indexKeyToMutationMapLocal.entrySet()) {
initializeLocalMockitoSetup(entry, TestType.EXPIRED);
@@ -365,7 +362,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest {
assertTrue(actualPR.equals(expectedPR));
}
}
- @Ignore
+
@Test
public void testVerifySingleIndexRow_invalidIndexRowCount_cellValue() throws IOException {
IndexToolVerificationResult.PhaseResult expectedPR = getInvalidPhaseResult();
@@ -379,7 +376,6 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest {
}
}
- @Ignore
@Test
public void testVerifySingleIndexRow_invalidIndexRowCount_emptyCell() throws IOException {
IndexToolVerificationResult.PhaseResult expectedPR = getInvalidPhaseResult();
@@ -406,7 +402,6 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest {
}
}
- @Ignore
@Test
public void testVerifySingleIndexRow_invalidIndexRowCount_extraCell() throws IOException {
IndexToolVerificationResult.PhaseResult expectedPR = getInvalidPhaseResult();
@@ -428,33 +423,136 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest {
rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
}
+
+ // Test the major compaction on index table only.
+ // There is at least one expected mutation within maxLookBack that has its matching one in the actual list.
+ // However there are some expected mutations outside of maxLookBack, which matching ones in actual list may be compacted away.
+ // We will report such row as a valid row.
@Test
- public void testVerifySingleIndexRow_actualMutations_null() throws IOException {
- byte [] validRowKey = getValidRowKey();
- when(indexRow.getRow()).thenReturn(validRowKey);
- when(rebuildScanner.prepareActualIndexMutations(indexRow)).thenReturn(null);
- exceptionRule.expect(DoNotRetryIOException.class);
- exceptionRule.expectMessage(IndexRebuildRegionScanner.ACTUAL_MUTATION_IS_NULL_OR_EMPTY);
- rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+ public void testVerifySingleIndexRow_compactionOnIndexTable_atLeastOneExpectedMutationWithinMaxLookBack() throws Exception {
+ String dataRowKey = "k1";
+ byte[] indexRowKey1Bytes = generateIndexRowKey(dataRowKey, "val1");
+ ManualEnvironmentEdge injectEdge = new ManualEnvironmentEdge();
+ injectEdge.setValue(1);
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+
+ List<Mutation> expectedMutations = new ArrayList<>();
+ List<Mutation> actualMutations = new ArrayList<>();
+ // change the maxLookBack from infinite to some interval, which allows to simulate the mutation beyond the maxLookBack window.
+ long maxLookbackInMills = 10 * 1000;
+ rebuildScanner.setMaxLookBackInMills(maxLookbackInMills);
+
+ Put put = new Put(indexRowKey1Bytes);
+ Cell cell = CellUtil.createCell(indexRowKey1Bytes,
+ QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+ QueryConstants.EMPTY_COLUMN_BYTES,
+ EnvironmentEdgeManager.currentTimeMillis(),
+ KeyValue.Type.Put.getCode(),
+ IndexRegionObserver.VERIFIED_BYTES);
+ put.add(cell);
+ // This mutation is beyond maxLookBack, so add it to expectedMutations only.
+ expectedMutations.add(put);
+
+ // advance the time of maxLookBack, so last mutation will be outside of maxLookBack,
+ // next mutation will be within maxLookBack
+ injectEdge.incrementValue(maxLookbackInMills);
+ put = new Put(indexRowKey1Bytes);
+ cell = CellUtil.createCell(indexRowKey1Bytes,
+ QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+ QueryConstants.EMPTY_COLUMN_BYTES,
+ EnvironmentEdgeManager.currentTimeMillis(),
+ KeyValue.Type.Put.getCode(),
+ IndexRegionObserver.VERIFIED_BYTES);
+ put.add(cell);
+ // This mutation is in both expectedMutations and actualMutations, as it is within the maxLookBack, so it will not get chance to be compacted away
+ expectedMutations.add(put);
+ actualMutations.add(put);
+ Result actualMutationsScanResult = Result.create(Arrays.asList(cell));
+
+ Map<byte[], List<Mutation>> indexKeyToMutationMap = Maps.newTreeMap((Bytes.BYTES_COMPARATOR));
+ indexKeyToMutationMap.put(indexRowKey1Bytes, expectedMutations);
+ rebuildScanner.setIndexKeyToMutationMap(indexKeyToMutationMap);
+ when(rebuildScanner.prepareActualIndexMutations(any(Result.class))).thenReturn(actualMutations);
+
+ injectEdge.incrementValue(1);
+ IndexToolVerificationResult.PhaseResult actualPR = new IndexToolVerificationResult.PhaseResult();
+ // Report this validation as a success
+ assertTrue(rebuildScanner.verifySingleIndexRow(actualMutationsScanResult, actualPR));
+ // validIndexRowCount = 1
+ IndexToolVerificationResult.PhaseResult expectedPR = new IndexToolVerificationResult.PhaseResult(1, 0, 0, 0, 0, 0);
+ assertTrue(actualPR.equals(expectedPR));
}
+ // Test the major compaction on index table only.
+ // All expected mutations are beyond the maxLookBack, and there are no matching ones in the actual list because of major compaction.
+ // We will report such row as an invalid beyond maxLookBack row.
@Test
- public void testVerifySingleIndexRow_actualMutations_empty() throws IOException {
- byte [] validRowKey = getValidRowKey();
- when(indexRow.getRow()).thenReturn(validRowKey);
- actualMutationList = new ArrayList<>();
- when(rebuildScanner.prepareActualIndexMutations(indexRow)).thenReturn(actualMutationList);
- exceptionRule.expect(DoNotRetryIOException.class);
- exceptionRule.expectMessage(IndexRebuildRegionScanner.ACTUAL_MUTATION_IS_NULL_OR_EMPTY);
- rebuildScanner.verifySingleIndexRow(indexRow, actualPR);
+ public void testVerifySingleIndexRow_compactionOnIndexTable_noExpectedMutationWithinMaxLookBack() throws Exception {
+ String dataRowKey = "k1";
+ byte[] indexRowKey1Bytes = generateIndexRowKey(dataRowKey, "val1");
+ List<Mutation> expectedMutations = new ArrayList<>();
+ List<Mutation> actualMutations = new ArrayList<>();
+ // change the maxLookBack from infinite to some interval, which allows to simulate the mutation beyond the maxLookBack window.
+ long maxLookbackInMills = 10 * 1000;
+ rebuildScanner.setMaxLookBackInMills(maxLookbackInMills);
+
+ ManualEnvironmentEdge injectEdge = new ManualEnvironmentEdge();
+ injectEdge.setValue(1);
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+
+ Put put = new Put(indexRowKey1Bytes);
+ Cell cell = CellUtil.createCell(indexRowKey1Bytes,
+ QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+ QueryConstants.EMPTY_COLUMN_BYTES,
+ EnvironmentEdgeManager.currentTimeMillis(),
+ KeyValue.Type.Put.getCode(),
+ VERIFIED_BYTES);
+ put.add(cell);
+ // This mutation is beyond maxLookBack, so add it to expectedMutations only.
+ expectedMutations.add(put);
+
+ injectEdge.incrementValue(maxLookbackInMills);
+ put = new Put(indexRowKey1Bytes);
+ cell = CellUtil.createCell(indexRowKey1Bytes,
+ QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
+ QueryConstants.EMPTY_COLUMN_BYTES,
+ EnvironmentEdgeManager.currentTimeMillis(),
+ KeyValue.Type.Put.getCode(),
+ UNVERIFIED_BYTES);
+ put.add(cell);
+ // This mutation is actualMutations only, as it is an unverified put
+ actualMutations.add(put);
+ Result actualMutationsScanResult = Result.create(Arrays.asList(cell));
+
+ Map<byte[], List<Mutation>> indexKeyToMutationMap = Maps.newTreeMap((Bytes.BYTES_COMPARATOR));
+ indexKeyToMutationMap.put(indexRowKey1Bytes, expectedMutations);
+ rebuildScanner.setIndexKeyToMutationMap(indexKeyToMutationMap);
+ when(rebuildScanner.prepareActualIndexMutations(any(Result.class))).thenReturn(actualMutations);
+
+ injectEdge.incrementValue(1);
+ IndexToolVerificationResult.PhaseResult actualPR = new IndexToolVerificationResult.PhaseResult();
+ // Report this validation as a failure
+ assertFalse(rebuildScanner.verifySingleIndexRow(actualMutationsScanResult, actualPR));
+ // beyondMaxLookBackInvalidIndexRowCount = 1
+ IndexToolVerificationResult.PhaseResult expectedPR = new IndexToolVerificationResult.PhaseResult(0, 0, 0, 0, 0, 1);
+ assertTrue(actualPR.equals(expectedPR));
+ }
+
+ private static byte[] generateIndexRowKey(String dataRowKey, String dataVal){
+ List<Byte> idxKey = new ArrayList<>();
+ if (dataVal != null && !dataVal.isEmpty())
+ idxKey.addAll(com.google.common.primitives.Bytes.asList(Bytes.toBytes(dataVal)));
+ idxKey.add(QueryConstants.SEPARATOR_BYTE);
+ idxKey.addAll(com.google.common.primitives.Bytes.asList(Bytes.toBytes(dataRowKey)));
+ return com.google.common.primitives.Bytes.toArray(idxKey);
}
private IndexToolVerificationResult.PhaseResult getValidPhaseResult() {
- return new IndexToolVerificationResult.PhaseResult(1,0,0,0);
+ return new IndexToolVerificationResult.PhaseResult(1, 0, 0, 0, 0, 0);
}
private IndexToolVerificationResult.PhaseResult getInvalidPhaseResult() {
- return new IndexToolVerificationResult.PhaseResult(0, 0, 0, 1);
+ return new IndexToolVerificationResult.PhaseResult(0, 0, 0, 1, 0, 0);
}
private void initializeLocalMockitoSetup(Map.Entry<byte[], List<Mutation>> entry,
@@ -537,7 +635,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest {
newCell =
CellUtil.createCell(CellUtil.cloneRow(c), CellUtil.cloneFamily(c),
Bytes.toBytes(UNEXPECTED_COLUMN),
- EnvironmentEdgeManager.currentTimeMillis(),
+ c.getTimestamp(),
KeyValue.Type.Put.getCode(), Bytes.toBytes("zxcv"));
newCellList.add(newCell);
newCellList.add(c);
@@ -575,7 +673,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest {
private Cell getVerifiedEmptyCell(Cell c) {
return CellUtil.createCell(CellUtil.cloneRow(c), CellUtil.cloneFamily(c),
indexMaintainer.getEmptyKeyValueQualifier(),
- EnvironmentEdgeManager.currentTimeMillis(),
+ c.getTimestamp(),
KeyValue.Type.Put.getCode(), VERIFIED_BYTES);
}