You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sk...@apache.org on 2020/05/19 01:59:42 UTC
[phoenix] branch 4.x updated: PHOENIX-5896: Implement incremental
rebuild along the failed regions in IndexTool (#779)
This is an automated email from the ASF dual-hosted git repository.
skadam pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new 5f9364d PHOENIX-5896: Implement incremental rebuild along the failed regions in IndexTool (#779)
5f9364d is described below
commit 5f9364db7e4925229704706e148e62f4cf4ec4c2
Author: Swaroopa Kadam <sw...@gmail.com>
AuthorDate: Mon May 18 18:59:34 2020 -0700
PHOENIX-5896: Implement incremental rebuild along the failed regions in IndexTool (#779)
---
.../end2end/IndexToolForNonTxGlobalIndexIT.java | 97 +++++++++++++++++
.../org/apache/phoenix/end2end/IndexToolIT.java | 18 +++-
.../phoenix/end2end/IndexToolTimeRangeIT.java | 16 +--
.../coprocessor/BaseScannerRegionObserver.java | 1 +
.../coprocessor/GlobalIndexRegionScanner.java | 2 -
.../coprocessor/IndexRebuildRegionScanner.java | 38 ++++++-
.../phoenix/coprocessor/IndexerRegionScanner.java | 3 +
.../PhoenixServerBuildIndexInputFormat.java | 6 +-
.../apache/phoenix/mapreduce/index/IndexTool.java | 119 ++++++++++++++-------
.../index/IndexVerificationOutputRepository.java | 21 ++--
.../index/IndexVerificationResultRepository.java | 76 +++++++++----
.../mapreduce/util/PhoenixConfigurationUtil.java | 12 +++
.../org/apache/phoenix/index/IndexToolTest.java | 47 ++++++--
.../apache/phoenix/index/IndexUpgradeToolTest.java | 6 +-
.../org/apache/phoenix/index/ShouldVerifyTest.java | 98 +++++++++++++++++
.../util/PhoenixConfigurationUtilTest.java | 11 ++
16 files changed, 472 insertions(+), 99 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
index 018ec0a..91b9258 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
@@ -36,6 +37,7 @@ import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexScrutiny;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
@@ -43,19 +45,33 @@ import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Assert;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.apache.phoenix.mapreduce.PhoenixJobCounters.INPUT_RECORDS;
+import static org.apache.phoenix.mapreduce.index.IndexTool.RETRY_VERIFY_NOT_APPLICABLE;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.INDEX_TOOL_RUN_STATUS_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.RESULT_TABLE_COLUMN_FAMILY;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.RESULT_TABLE_NAME_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.ROW_KEY_SEPARATOR;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.RUN_STATUS_EXECUTED;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.RUN_STATUS_SKIPPED;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT;
@@ -70,6 +86,7 @@ import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEF
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.CURRENT_SCN_VALUE;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -81,6 +98,8 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT
private boolean directApi = true;
private boolean useSnapshot = false;
private boolean mutable;
+ @Rule
+ public ExpectedException exceptionRule = ExpectedException.none();
public IndexToolForNonTxGlobalIndexIT(boolean mutable) {
StringBuilder optionBuilder = new StringBuilder();
@@ -485,4 +504,82 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT
}
}
+ @Test
+ public void testIndexToolForIncrementalRebuild() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) "+tableDDLOptions);
+ conn.createStatement().execute(String.format(
+ "CREATE INDEX %s ON %s (NAME) INCLUDE (CODE)", indexTableName, dataTableFullName));
+
+ conn.createStatement().execute("upsert into " + dataTableFullName + " values (1, 'Phoenix', 'A')");
+ conn.createStatement().execute("upsert into " + dataTableFullName + " values (2, 'Phoenix1', 'B')");
+ conn.commit();
+
+ IndexTool it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
+ null, 0, IndexTool.IndexVerifyType.AFTER);
+ Long scn = it.getJob().getConfiguration().getLong(CURRENT_SCN_VALUE, 1L);
+ verifyRunStatusFromResultTable(conn, scn, indexTableFullName, 3, RUN_STATUS_EXECUTED);
+
+
+ exceptionRule.expect(RuntimeException.class);
+ exceptionRule.expectMessage(RETRY_VERIFY_NOT_APPLICABLE);
+ it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
+ null, 0, IndexTool.IndexVerifyType.AFTER, "-rv", String.valueOf(10L));
+
+ it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
+ null, 0, IndexTool.IndexVerifyType.AFTER, "-rv", Long.toString(scn));
+ scn = it.getJob().getConfiguration().getLong(CURRENT_SCN_VALUE, 1L);
+ verifyRunStatusFromResultTable(conn, scn, indexTableFullName, 6, RUN_STATUS_SKIPPED);
+
+ conn.createStatement().execute( "DELETE FROM "+indexTableFullName);
+ conn.commit();
+ TestUtil.doMajorCompaction(conn, indexTableFullName);
+
+ it = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
+ null, 0, IndexTool.IndexVerifyType.AFTER, "-rv", Long.toString(scn));
+ scn = it.getJob().getConfiguration().getLong(CURRENT_SCN_VALUE, 1L);
+ verifyRunStatusFromResultTable(conn, scn, indexTableFullName, 9, RUN_STATUS_SKIPPED);
+
+ ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + indexTableFullName);
+ Assert.assertFalse(rs.next());
+
+ //testing the dependent method
+ Assert.assertFalse(it.isValidLastVerifyTime(10L));
+ Assert.assertFalse(it.isValidLastVerifyTime(EnvironmentEdgeManager.currentTimeMillis() - 1000L));
+ Assert.assertTrue(it.isValidLastVerifyTime(scn));
+ }
+ }
+
+ private List<String> verifyRunStatusFromResultTable(Connection conn, Long scn, String indexTable, int totalRows, String expectedStatus) throws SQLException, IOException {
+ Table hIndexToolTable = conn.unwrap(PhoenixConnection.class).getQueryServices()
+ .getTable(RESULT_TABLE_NAME_BYTES);
+ Assert.assertEquals(totalRows, TestUtil.getRowCount(hIndexToolTable, false));
+ List<String> output = new ArrayList<>();
+ Scan s = new Scan();
+ s.setRowPrefixFilter(Bytes.toBytes(String.format("%s%s%s", scn, ROW_KEY_SEPARATOR, indexTable)));
+ ResultScanner rs = hIndexToolTable.getScanner(s);
+ int count =0;
+ for(Result r : rs) {
+ Assert.assertTrue(r != null);
+ List<Cell> cells = r.getColumnCells(RESULT_TABLE_COLUMN_FAMILY, INDEX_TOOL_RUN_STATUS_BYTES);
+ Assert.assertEquals(cells.size(), 1);
+ Assert.assertTrue(Bytes.toString(cells.get(0).getRow()).startsWith(String.valueOf(scn)));
+ output.add(Bytes.toString(cells.get(0).getValue()));
+ count++;
+ }
+ //for each region
+ Assert.assertEquals(count, 3);
+ Assert.assertEquals(expectedStatus, output.get(0));
+ Assert.assertEquals(expectedStatus, output.get(1));
+ Assert.assertEquals(expectedStatus, output.get(2));
+ return output;
+ }
+
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 8a9cc10..a2bd788 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -631,7 +631,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
private static List<String> getArgList (boolean directApi, boolean useSnapshot, String schemaName,
String dataTable, String indxTable, String tenantId,
- IndexTool.IndexVerifyType verifyType, Long startTime, Long endTime) {
+ IndexTool.IndexVerifyType verifyType, Long startTime, Long endTime, Long incrementalVerify) {
List<String> args = Lists.newArrayList();
if (schemaName != null) {
args.add("-s");
@@ -664,6 +664,10 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
args.add("-et");
args.add(String.valueOf(endTime));
}
+ if(incrementalVerify!=null) {
+ args.add("-rv");
+ args.add(String.valueOf(incrementalVerify));
+ }
args.add("-op");
args.add("/tmp/" + UUID.randomUUID().toString());
return args;
@@ -672,7 +676,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
public static String[] getArgValues(boolean directApi, boolean useSnapshot, String schemaName,
String dataTable, String indexTable, String tenantId, IndexTool.IndexVerifyType verifyType) {
List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable,
- tenantId, verifyType, null, null);
+ tenantId, verifyType, null, null, null);
return args.toArray(new String[0]);
}
@@ -680,7 +684,15 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
String dataTable, String indexTable, String tenantId,
IndexTool.IndexVerifyType verifyType, Long startTime, Long endTime) {
List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable,
- tenantId, verifyType, startTime, endTime);
+ tenantId, verifyType, startTime, endTime, null);
+ return args.toArray(new String[0]);
+ }
+
+ public static String [] getArgValues(boolean directApi, boolean useSnapshot, String schemaName,
+ String dataTable, String indexTable, String tenantId,
+ IndexTool.IndexVerifyType verifyType, Long incrementalVerify) {
+ List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable,
+ tenantId, verifyType, null, null, incrementalVerify);
return args.toArray(new String[0]);
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolTimeRangeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolTimeRangeIT.java
index a63f06a..3deedad 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolTimeRangeIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolTimeRangeIT.java
@@ -130,8 +130,8 @@ public class IndexToolTimeRangeIT extends BaseUniqueNamesOwnClusterIT {
@Test
public void testValidTimeRange() throws Exception {
String [] args = {"--delete-all-and-rebuild",
- "--starttime", myClock.getRelativeTimeAsString(1),
- "--endtime", myClock.getRelativeTimeAsString(9)};
+ "--start-time", myClock.getRelativeTimeAsString(1),
+ "--end-time", myClock.getRelativeTimeAsString(9)};
runIndexTool(args, 0);
// all rows should be rebuilt
Assert.assertEquals(5, countRowsInIndex());
@@ -141,8 +141,8 @@ public class IndexToolTimeRangeIT extends BaseUniqueNamesOwnClusterIT {
@Test
public void testValidTimeRange_startTimeInBetween() throws Exception {
String [] args = {"--delete-all-and-rebuild",
- "--starttime", myClock.getRelativeTimeAsString(6),
- "--endtime", myClock.getRelativeTimeAsString(9)};
+ "--start-time", myClock.getRelativeTimeAsString(6),
+ "--end-time", myClock.getRelativeTimeAsString(9)};
runIndexTool(args, 0);
// only last 3 rows should be rebuilt
Assert.assertEquals(3, countRowsInIndex());
@@ -151,8 +151,8 @@ public class IndexToolTimeRangeIT extends BaseUniqueNamesOwnClusterIT {
@Test
public void testValidTimeRange_endTimeInBetween() throws Exception {
String [] args = {"--delete-all-and-rebuild",
- "--starttime", myClock.getRelativeTimeAsString(1),
- "--endtime", myClock.getRelativeTimeAsString(6)};
+ "--start-time", myClock.getRelativeTimeAsString(1),
+ "--end-time", myClock.getRelativeTimeAsString(6)};
runIndexTool(args, 0);
// only first 2 should be rebuilt
Assert.assertEquals(2, countRowsInIndex());
@@ -171,7 +171,7 @@ public class IndexToolTimeRangeIT extends BaseUniqueNamesOwnClusterIT {
public void testValidTimeRange_onlyStartTimePassed() throws Exception {
//starttime passed of last upsert
String [] args = {"--delete-all-and-rebuild",
- "--starttime", myClock.getRelativeTimeAsString(8)};
+ "--start-time", myClock.getRelativeTimeAsString(8)};
runIndexTool(args, 0);
Assert.assertEquals(1, countRowsInIndex());
}
@@ -180,7 +180,7 @@ public class IndexToolTimeRangeIT extends BaseUniqueNamesOwnClusterIT {
public void testValidTimeRange_onlyEndTimePassed() throws Exception {
//end time passed as time of second upsert
String [] args = {"--delete-all-and-rebuild",
- "--endtime", myClock.getRelativeTimeAsString(5)};
+ "--end-time", myClock.getRelativeTimeAsString(5)};
runIndexTool(args, 0);
Assert.assertEquals(1, countRowsInIndex());
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 6e0a1e4..4897741 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -88,6 +88,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
public static final String INDEX_REBUILD_PAGING = "_IndexRebuildPaging";
// Index verification type done by the index tool
public static final String INDEX_REBUILD_VERIFY_TYPE = "_IndexRebuildVerifyType";
+ public static final String INDEX_RETRY_VERIFY = "_IndexRetryVerify";
/*
* Attribute to denote that the index maintainer has been serialized using its proto-buf presentation.
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
index 35a0a8a..b5334d1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -107,8 +107,6 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
indexTableTTL = indexHTable.getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
maxLookBackInMills = ScanInfoUtil.getMaxLookbackInMillis(config);
- verificationResultRepository =
- new IndexVerificationResultRepository(indexMaintainer.getIndexTableName(), hTableFactory);
pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor(
new ThreadPoolBuilder("IndexVerify",
env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 4f1c48f..9feb27f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -38,6 +38,7 @@ import java.util.concurrent.ExecutionException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
+import com.sun.org.apache.xpath.internal.operations.Bool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -47,6 +48,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
@@ -57,6 +59,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.cache.ServerCacheClient;
@@ -111,6 +114,7 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner {
private Map<byte[], NavigableSet<byte[]>> familyMap;
private byte[][] viewConstants;
private IndexVerificationOutputRepository verificationOutputRepository;
+ private boolean skipped = false;
@VisibleForTesting
public IndexRebuildRegionScanner(final RegionScanner innerScanner, final Region region, final Scan scan,
@@ -148,6 +152,8 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner {
viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
verificationOutputRepository =
new IndexVerificationOutputRepository(indexMaintainer.getIndexTableName(), hTableFactory);
+ verificationResultRepository =
+ new IndexVerificationResultRepository(indexMaintainer.getIndexTableName(), hTableFactory);
indexKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
dataKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor(
@@ -159,6 +165,32 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner {
}
}
+ @VisibleForTesting
+ public boolean shouldVerify(IndexTool.IndexVerifyType verifyType,
+ byte[] indexRowKey, Scan scan, Region region, IndexMaintainer indexMaintainer,
+ IndexVerificationResultRepository verificationResultRepository) throws IOException {
+ this.verifyType = verifyType;
+ this.indexRowKey = indexRowKey;
+ this.scan = scan;
+ this.region = region;
+ this.indexMaintainer = indexMaintainer;
+ this.verificationResultRepository = verificationResultRepository;
+ return shouldVerify();
+ }
+
+ private boolean shouldVerify() throws IOException {
+ //In case of read repair, proceed with rebuild
+ //All other types of rebuilds/verification should be incrementally performed if appropriate param is passed
+ byte[] lastVerifyTimeValue = scan.getAttribute(UngroupedAggregateRegionObserver.INDEX_RETRY_VERIFY);
+ Long lastVerifyTime = lastVerifyTimeValue == null ? 0 : Bytes.toLong(lastVerifyTimeValue);
+ if(indexRowKey != null || lastVerifyTime == 0) {
+ return true;
+ }
+ verificationResult = verificationResultRepository
+ .getVerificationResult(lastVerifyTime, scan, region, indexMaintainer.getIndexTableName());
+ return verificationResult == null;
+ }
+
private void setReturnCodeForSingleRowRebuild() throws IOException {
try (RegionScanner scanner = region.getScanner(scan)) {
List<Cell> row = new ArrayList<>();
@@ -200,7 +232,7 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner {
if (verify) {
try {
verificationResultRepository.logToIndexToolResultTable(verificationResult,
- verifyType, region.getRegionInfo().getRegionName());
+ verifyType, region.getRegionInfo().getRegionName(), skipped);
} finally {
this.pool.stop("IndexRebuildRegionScanner is closing");
hTableFactory.shutdown();
@@ -1191,6 +1223,10 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner {
}
Cell lastCell = null;
int rowCount = 0;
+ if(!shouldVerify()) {
+ skipped = true;
+ return false;
+ }
region.startRegionOperation();
try {
byte[] uuidValue = ServerCacheClient.generateId();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
index 88bac86..b493729 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
@@ -58,6 +58,7 @@ import org.apache.phoenix.hbase.index.parallel.TaskBatch;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.schema.types.PVarbinary;
@@ -78,6 +79,8 @@ public class IndexerRegionScanner extends GlobalIndexRegionScanner {
final RegionCoprocessorEnvironment env) throws IOException {
super(innerScanner, region, scan, env);
indexKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ verificationResultRepository =
+ new IndexVerificationResultRepository(indexMaintainer.getIndexTableName(), hTableFactory);
}
@Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
index d129e93..9408369 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
@@ -43,6 +43,7 @@ import com.google.common.base.Preconditions;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getCurrentScnValue;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolDataTableName;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolIndexTableName;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolLastVerifyTime;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexVerifyType;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolStartTime;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.setCurrentScnValue;
@@ -74,6 +75,8 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph
final String txnScnValue = configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE);
final String currentScnValue = getCurrentScnValue(configuration);
final String tenantId = configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
+ final String lastVerifyTime = getIndexToolLastVerifyTime(configuration);
+
//until PHOENIX-5783 is fixed; we'll continue with startTime = 0
final String startTimeValue = null;
final Properties overridingProps = new Properties();
@@ -97,11 +100,12 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph
new ServerBuildIndexCompiler(phoenixConnection, dataTableFullName);
MutationPlan plan = compiler.compile(indexTable);
Scan scan = plan.getContext().getScan();
-
+ Long lastVerifyTimeValue = lastVerifyTime == null ? 0L : Long.valueOf(lastVerifyTime);
try {
scan.setTimeRange(startTime, scn);
scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING, TRUE_BYTES);
scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE, getIndexVerifyType(configuration).toBytes());
+ scan.setAttribute(BaseScannerRegionObserver.INDEX_RETRY_VERIFY, Bytes.toBytes(lastVerifyTimeValue));
} catch (IOException e) {
throw new SQLException(e);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index e30da37..81cbea3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -23,6 +23,8 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAM
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.ROW_KEY_SEPARATOR;
+
import java.io.IOException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
@@ -49,6 +51,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -56,7 +59,9 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
@@ -92,7 +97,6 @@ import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
-import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.util.ByteUtil;
@@ -168,10 +172,11 @@ public class IndexTool extends Configured implements Tool {
private PTable pDataTable;
private String tenantId = null;
private Job job;
- private Long startTime, endTime;
+ private Long startTime, endTime, lastVerifyTime;
private IndexType indexType;
private String basePath;
byte[][] splitKeysBeforeJob = null;
+ Configuration configuration;
private static final Option SCHEMA_NAME_OPTION = new Option("s", "schema", true,
"Phoenix schema name (optional)");
@@ -230,20 +235,27 @@ public class IndexTool extends Configured implements Tool {
+ "If specified, truncates the index table and rebuilds (optional)");
private static final Option HELP_OPTION = new Option("h", "help", false, "Help");
- private static final Option START_TIME_OPTION = new Option("st", "starttime",
+ private static final Option START_TIME_OPTION = new Option("st", "start-time",
true, "Start time for indextool rebuild or verify");
- private static final Option END_TIME_OPTION = new Option("et", "endtime",
+ private static final Option END_TIME_OPTION = new Option("et", "end-time",
true, "End time for indextool rebuild or verify");
+ private static final Option RETRY_VERIFY_OPTION = new Option("rv", "retry-verify",
+ true, "Max scan ts of the last rebuild/verify that needs to be retried incrementally");
+
public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s.%s_INDX_%s";
public static final String INVALID_TIME_RANGE_EXCEPTION_MESSAGE = "startTime is greater than "
+ "or equal to endTime "
+ "or either of them are set in the future; IndexTool can't proceed.";
- public static final String FEATURE_NOT_APPLICABLE = "starttime-endtime feature is only "
+ public static final String FEATURE_NOT_APPLICABLE = "start-time/end-time and retry verify feature are only "
+ "applicable for local or non-transactional global indexes";
+
+ public static final String RETRY_VERIFY_NOT_APPLICABLE = "retry verify feature accepts "
+ + "non-zero ts set in the past and ts must be present in PHOENIX_INDEX_TOOL_RESULT table";
+
private Options getOptions() {
final Options options = new Options();
options.addOption(SCHEMA_NAME_OPTION);
@@ -262,10 +274,12 @@ public class IndexTool extends Configured implements Tool {
SPLIT_INDEX_OPTION.setOptionalArg(true);
START_TIME_OPTION.setOptionalArg(true);
END_TIME_OPTION.setOptionalArg(true);
+ RETRY_VERIFY_OPTION.setOptionalArg(true);
options.addOption(AUTO_SPLIT_INDEX_OPTION);
options.addOption(SPLIT_INDEX_OPTION);
options.addOption(START_TIME_OPTION);
options.addOption(END_TIME_OPTION);
+ options.addOption(RETRY_VERIFY_OPTION);
return options;
}
@@ -338,9 +352,9 @@ public class IndexTool extends Configured implements Tool {
return startTime;
}
- public Long getEndTime() {
- return endTime;
- }
+ public Long getEndTime() { return endTime; }
+
+ public Long getLastVerifyTime() { return lastVerifyTime; }
class JobFactory {
Connection connection;
@@ -379,6 +393,9 @@ public class IndexTool extends Configured implements Tool {
if (endTime != null) {
PhoenixConfigurationUtil.setCurrentScnValue(configuration, endTime);
}
+ if (lastVerifyTime != null) {
+ PhoenixConfigurationUtil.setIndexToolLastVerifyTime(configuration, lastVerifyTime);
+ }
return configureJobForServerBuildIndex();
}
}
@@ -475,21 +492,22 @@ public class IndexTool extends Configured implements Tool {
for (String index : disableIndexes) {
quotedIndexes.add("'" + index + "'");
}
- ResultSet rs = connection.createStatement()
+ try (ResultSet rs = connection.createStatement()
.executeQuery("SELECT MAX(" + ASYNC_REBUILD_TIMESTAMP + "),MAX("+INDEX_DISABLE_TIMESTAMP+") FROM " + SYSTEM_CATALOG_NAME + " ("
+ ASYNC_REBUILD_TIMESTAMP + " BIGINT) WHERE " + TABLE_SCHEM
+ (schemaName != null && schemaName.length() > 0 ? "='" + schemaName + "'" : " IS NULL")
- + " and " + TABLE_NAME + " IN (" + StringUtils.join(",", quotedIndexes) + ")");
- if (rs.next()) {
- maxRebuilAsyncDate = rs.getLong(1);
- maxDisabledTimeStamp = rs.getLong(2);
- }
- // Do check if table is disabled again after user invoked async rebuilding during the run of the job
- if (maxRebuilAsyncDate > maxDisabledTimeStamp) {
- return maxRebuilAsyncDate;
- } else {
- throw new RuntimeException(
- "Inconsistent state we have one or more index tables which are disabled after the async is called!!");
+ + " and " + TABLE_NAME + " IN (" + StringUtils.join(",", quotedIndexes) + ")")) {
+ if (rs.next()) {
+ maxRebuilAsyncDate = rs.getLong(1);
+ maxDisabledTimeStamp = rs.getLong(2);
+ }
+ // Do check if table is disabled again after user invoked async rebuilding during the run of the job
+ if (maxRebuilAsyncDate > maxDisabledTimeStamp) {
+ return maxRebuilAsyncDate;
+ } else {
+ throw new RuntimeException(
+ "Inconsistent state we have one or more index tables which are disabled after the async is called!!");
+ }
}
}
@@ -665,14 +683,18 @@ public class IndexTool extends Configured implements Tool {
printHelpAndExit(e.getMessage(), getOptions());
return -1;
}
+ configuration = HBaseConfiguration.addHbaseResources(getConf());
populateIndexToolAttributes(cmdLine);
- Configuration configuration = getConfiguration(tenantId);
+
+ if (tenantId != null) {
+ configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ }
try (Connection conn = getConnection(configuration)) {
createIndexToolTables(conn);
if (dataTable != null && indexTable != null) {
setupIndexAndDataTable(conn);
- checkTimeRangeFeature(startTime, endTime, pDataTable, isLocalIndexBuild);
+ checkIfFeatureApplicable(startTime, endTime, lastVerifyTime, pDataTable, isLocalIndexBuild);
if (shouldDeleteBeforeRebuild) {
deleteBeforeRebuild(conn);
}
@@ -694,18 +716,14 @@ public class IndexTool extends Configured implements Tool {
}
}
- public static void checkTimeRangeFeature(Long startTime, Long endTime, PTable pDataTable, boolean isLocalIndexBuild) {
- if (isTimeRangeSet(startTime, endTime) && !isTimeRangeFeatureApplicable(pDataTable, isLocalIndexBuild)) {
- throw new RuntimeException(FEATURE_NOT_APPLICABLE);
- }
- }
-
- private Configuration getConfiguration(String tenantId) {
- final Configuration configuration = HBaseConfiguration.addHbaseResources(getConf());
- if (tenantId != null) {
- configuration.set(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
+ public static void checkIfFeatureApplicable(Long startTime, Long endTime, Long lastVerifyTime,
+ PTable pDataTable, boolean isLocalIndexBuild) {
+ boolean isApplicable = isFeatureApplicable(pDataTable, isLocalIndexBuild);
+ if (!isApplicable) {
+ if(isTimeRangeSet(startTime, endTime) || lastVerifyTime!=null) {
+ throw new RuntimeException(FEATURE_NOT_APPLICABLE);
+ }
}
- return configuration;
}
private boolean submitIndexToolJob(Connection conn, Configuration configuration)
@@ -734,10 +752,11 @@ public class IndexTool extends Configured implements Tool {
}
@VisibleForTesting
- public void populateIndexToolAttributes(CommandLine cmdLine) {
+ public int populateIndexToolAttributes(CommandLine cmdLine) throws Exception {
boolean useTenantId = cmdLine.hasOption(TENANT_ID_OPTION.getOpt());
boolean useStartTime = cmdLine.hasOption(START_TIME_OPTION.getOpt());
boolean useEndTime = cmdLine.hasOption(END_TIME_OPTION.getOpt());
+ boolean retryVerify = cmdLine.hasOption(RETRY_VERIFY_OPTION.getOpt());
boolean verify = cmdLine.hasOption(VERIFY_OPTION.getOpt());
if (useTenantId) {
@@ -749,6 +768,10 @@ public class IndexTool extends Configured implements Tool {
if (useEndTime) {
endTime = new Long(cmdLine.getOptionValue(END_TIME_OPTION.getOpt()));
}
+ if(retryVerify) {
+ lastVerifyTime = new Long(cmdLine.getOptionValue(RETRY_VERIFY_OPTION.getOpt()));
+ validateLastVerifyTime();
+ }
if(isTimeRangeSet(startTime, endTime)) {
validateTimeRange();
}
@@ -765,6 +788,30 @@ public class IndexTool extends Configured implements Tool {
isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt());
shouldDeleteBeforeRebuild = cmdLine.hasOption(DELETE_ALL_AND_REBUILD_OPTION.getOpt());
+ return 0;
+ }
+
+ private void validateLastVerifyTime() throws Exception {
+ Long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+ if (lastVerifyTime.compareTo(currentTime) > 0 || lastVerifyTime == 0L || !isValidLastVerifyTime(lastVerifyTime)) {
+ throw new RuntimeException(RETRY_VERIFY_NOT_APPLICABLE);
+ }
+
+ }
+
+ public boolean isValidLastVerifyTime(Long lastVerifyTime) throws Exception {
+ try(Connection conn = getConnection(configuration)) {
+ Table hIndexToolTable = conn.unwrap(PhoenixConnection.class).getQueryServices()
+ .getTable(IndexVerificationResultRepository.RESULT_TABLE_NAME_BYTES);
+ Scan s = new Scan();
+ ConnectionQueryServices cqs = conn.unwrap(PhoenixConnection.class).getQueryServices();
+ boolean isNamespaceMapped = SchemaUtil.isNamespaceMappingEnabled(null, cqs.getProps());
+ s.setRowPrefixFilter(Bytes.toBytes(String.format("%s%s%s", lastVerifyTime,
+ ROW_KEY_SEPARATOR,
+ SchemaUtil.getPhysicalHBaseTableName(schemaName, indexTable, isNamespaceMapped))));
+ ResultScanner rs = hIndexToolTable.getScanner(s);
+ return rs.next() != null;
+ }
}
private void validateTimeRange() {
@@ -810,7 +857,7 @@ public class IndexTool extends Configured implements Tool {
return startTime != null || endTime != null;
}
- private static boolean isTimeRangeFeatureApplicable(PTable dataTable, boolean isLocalIndexBuild) {
+ private static boolean isFeatureApplicable(PTable dataTable, boolean isLocalIndexBuild) {
if (isLocalIndexBuild || !dataTable.isTransactional()) {
return true;
}
@@ -1028,7 +1075,7 @@ public class IndexTool extends Configured implements Tool {
indexingTool.setConf(conf);
int status = indexingTool.run(args.toArray(new String[0]));
Job job = indexingTool.getJob();
- return new AbstractMap.SimpleEntry<Integer, Job>(status, job);
+ return new AbstractMap.SimpleEntry<>(status, job);
}
public static void main(final String[] args) throws Exception {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java
index 6e97a9d..7e8ee23 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java
@@ -164,17 +164,12 @@ public class IndexVerificationOutputRepository implements AutoCloseable {
throws IOException {
byte[] rowKey = generateOutputTableRowKey(scanMaxTs, indexTable.getName().toBytes(), dataRowKey);
Put put = new Put(rowKey);
- put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, DATA_TABLE_NAME_BYTES,
- scanMaxTs, tableName);
- put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, INDEX_TABLE_NAME_BYTES,
- scanMaxTs, indexName);
- put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, DATA_TABLE_TS_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(dataRowTs)));
+ put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, DATA_TABLE_NAME_BYTES, tableName);
+ put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, INDEX_TABLE_NAME_BYTES, indexName);
+ put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, DATA_TABLE_TS_BYTES, Bytes.toBytes(Long.toString(dataRowTs)));
- put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, INDEX_TABLE_ROW_KEY_BYTES,
- scanMaxTs, indexRowKey);
- put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, INDEX_TABLE_TS_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(indexRowTs)));
+ put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, INDEX_TABLE_ROW_KEY_BYTES, indexRowKey);
+ put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, INDEX_TABLE_TS_BYTES, Bytes.toBytes(Long.toString(indexRowTs)));
byte[] errorMessageBytes;
if (expectedValue != null) {
errorMessageBytes = getErrorMessageBytes(errorMsg, expectedValue, actualValue);
@@ -183,11 +178,11 @@ public class IndexVerificationOutputRepository implements AutoCloseable {
} else {
errorMessageBytes = Bytes.toBytes(errorMsg);
}
- put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, ERROR_MESSAGE_BYTES, scanMaxTs, errorMessageBytes);
+ put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, ERROR_MESSAGE_BYTES, errorMessageBytes);
if (isBeforeRebuild) {
- put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, VERIFICATION_PHASE_BYTES, scanMaxTs, PHASE_BEFORE_VALUE);
+ put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, VERIFICATION_PHASE_BYTES, PHASE_BEFORE_VALUE);
} else {
- put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, VERIFICATION_PHASE_BYTES, scanMaxTs, PHASE_AFTER_VALUE);
+ put.addColumn(OUTPUT_TABLE_COLUMN_FAMILY, VERIFICATION_PHASE_BYTES, PHASE_AFTER_VALUE);
}
outputTable.put(put);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java
index e52823e..08c431a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java
@@ -22,11 +22,13 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.compat.hbase.CompatUtil;
import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
@@ -44,9 +46,12 @@ import java.sql.SQLException;
public class IndexVerificationResultRepository implements AutoCloseable {
+ public static final String RUN_STATUS_SKIPPED = "Skipped";
+ public static final String RUN_STATUS_EXECUTED = "Executed";
private Table resultTable;
private Table indexTable;
- public static final byte[] ROW_KEY_SEPARATOR_BYTE = Bytes.toBytes("|");
+ public static final String ROW_KEY_SEPARATOR = "|";
+ public static final byte[] ROW_KEY_SEPARATOR_BYTE = Bytes.toBytes(ROW_KEY_SEPARATOR);
public final static String RESULT_TABLE_NAME = "PHOENIX_INDEX_TOOL_RESULT";
public final static byte[] RESULT_TABLE_NAME_BYTES = Bytes.toBytes(RESULT_TABLE_NAME);
public final static byte[] RESULT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
@@ -57,6 +62,8 @@ public class IndexVerificationResultRepository implements AutoCloseable {
public final static String BEFORE_REBUILD_VALID_INDEX_ROW_COUNT =
"BeforeRebuildValidIndexRowCount";
public final static byte[] BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT);
+ private static final String INDEX_TOOL_RUN_STATUS = "IndexToolRunStatus";
+ public final static byte[] INDEX_TOOL_RUN_STATUS_BYTES = Bytes.toBytes(INDEX_TOOL_RUN_STATUS);
public final static String BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT =
"BeforeRebuildExpiredIndexRowCount";
public final static byte[] BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES = Bytes.toBytes(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT);
@@ -139,7 +146,7 @@ public class IndexVerificationResultRepository implements AutoCloseable {
setResultTable(admin.getConnection().getTable(resultTableName));
}
}
- public static byte[] generateResultTableRowKey(long ts, byte[] indexTableName, byte [] regionName,
+ private static byte[] generateResultTableRowKey(long ts, byte[] indexTableName, byte [] regionName,
byte[] startRow, byte[] stopRow) {
byte[] keyPrefix = Bytes.toBytes(Long.toString(ts));
int targetOffset = 0;
@@ -169,56 +176,59 @@ public class IndexVerificationResultRepository implements AutoCloseable {
}
public void logToIndexToolResultTable(IndexToolVerificationResult verificationResult,
- IndexTool.IndexVerifyType verifyType, byte[] region) throws IOException {
+ IndexTool.IndexVerifyType verifyType, byte[] region) throws IOException {
+ logToIndexToolResultTable(verificationResult, verifyType, region, false);
+ }
+
+ public void logToIndexToolResultTable(IndexToolVerificationResult verificationResult,
+ IndexTool.IndexVerifyType verifyType, byte[] region, boolean skipped) throws IOException {
long scanMaxTs = verificationResult.getScanMaxTs();
byte[] rowKey = generateResultTableRowKey(scanMaxTs, indexTable.getName().toBytes(),
region, verificationResult.getStartRow(),
verificationResult.getStopRow());
Put put = new Put(rowKey);
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, SCANNED_DATA_ROW_COUNT_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getScannedDataRowCount())));
+ Bytes.toBytes(Long.toString(verificationResult.getScannedDataRowCount())));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, REBUILT_INDEX_ROW_COUNT_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getRebuiltIndexRowCount())));
+ Bytes.toBytes(Long.toString(verificationResult.getRebuiltIndexRowCount())));
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, INDEX_TOOL_RUN_STATUS_BYTES,
+ Bytes.toBytes(skipped ? RUN_STATUS_SKIPPED : RUN_STATUS_EXECUTED));
if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH ||
verifyType == IndexTool.IndexVerifyType.ONLY) {
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildValidIndexRowCount())));
+ Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildValidIndexRowCount())));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildExpiredIndexRowCount())));
+ Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildExpiredIndexRowCount())));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildMissingIndexRowCount())));
+ Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildMissingIndexRowCount())));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildInvalidIndexRowCount())));
+ Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildInvalidIndexRowCount())));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES,
- scanMaxTs,
Bytes.toBytes(Long.toString(verificationResult.getBefore().getBeyondMaxLookBackMissingIndexRowCount())));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES,
- scanMaxTs,
Bytes.toBytes(Long.toString(verificationResult.getBefore().getBeyondMaxLookBackInvalidIndexRowCount())));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBeforeIndexHasExtraCellsCount())));
+ Bytes.toBytes(Long.toString(verificationResult.getBeforeIndexHasExtraCellsCount())));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getBeforeIndexHasMissingCellsCount())));
+ Bytes.toBytes(Long.toString(verificationResult.getBeforeIndexHasMissingCellsCount())));
}
if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) {
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildValidIndexRowCount())));
+ Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildValidIndexRowCount())));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildExpiredIndexRowCount())));
+ Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildExpiredIndexRowCount())));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildMissingIndexRowCount())));
+ Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildMissingIndexRowCount())));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildInvalidIndexRowCount())));
+ Bytes.toBytes(Long.toString(verificationResult.getAfterRebuildInvalidIndexRowCount())));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES,
- scanMaxTs,
Bytes.toBytes(Long.toString(verificationResult.getAfter().getBeyondMaxLookBackMissingIndexRowCount())));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES,
- scanMaxTs,
Bytes.toBytes(Long.toString(verificationResult.getAfter().getBeyondMaxLookBackInvalidIndexRowCount())));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfterIndexHasExtraCellsCount())));
+ Bytes.toBytes(Long.toString(verificationResult.getAfterIndexHasExtraCellsCount())));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS_BYTES,
- scanMaxTs, Bytes.toBytes(Long.toString(verificationResult.getAfterIndexHasMissingCellsCount())));
+ Bytes.toBytes(Long.toString(verificationResult.getAfterIndexHasMissingCellsCount())));
}
resultTable.put(put);
}
@@ -258,6 +268,22 @@ public class IndexVerificationResultRepository implements AutoCloseable {
return verificationResult;
}
+ private IndexToolVerificationResult getVerificationResult(Table htable, byte [] oldRowKey, Scan scan )
+ throws IOException {
+ IndexToolVerificationResult verificationResult = null;
+ Result result = htable.get(new Get(oldRowKey));
+ if(result != null) {
+ byte[][] rowKeyParts = ByteUtil.splitArrayBySeparator(result.getRow(), ROW_KEY_SEPARATOR_BYTE[0]);
+ verificationResult = new IndexToolVerificationResult(scan);
+ verificationResult.setStartRow(rowKeyParts[3]);
+ verificationResult.setStopRow(rowKeyParts[4]);
+ for (Cell cell : result.rawCells()) {
+ verificationResult.update(cell);
+ }
+ }
+ return verificationResult;
+ }
+
public void close() throws IOException {
if (resultTable != null) {
resultTable.close();
@@ -274,5 +300,13 @@ public class IndexVerificationResultRepository implements AutoCloseable {
public void setIndexTable(Table indexTable) {
this.indexTable = indexTable;
}
+
+ public IndexToolVerificationResult getVerificationResult(Long ts, Scan scan, Region region, byte[] indexTableName) throws IOException {
+ byte [] rowKey = generateResultTableRowKey(ts,
+ indexTableName, region.getRegionInfo().getRegionName(),
+ scan.getStartRow(), scan.getStopRow());
+ return getVerificationResult(resultTable, rowKey, scan);
+
+ }
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 31bfbac..f575b09 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -160,6 +160,7 @@ public final class PhoenixConfigurationUtil {
public static final String MAPREDUCE_TENANT_ID = "phoenix.mapreduce.tenantid";
private static final String INDEX_TOOL_END_TIME = "phoenix.mr.index.endtime";
private static final String INDEX_TOOL_START_TIME = "phoenix.mr.index.starttime";
+ private static final String INDEX_TOOL_LAST_VERIFY_TIME = "phoenix.mr.index.last.verify.time";
public static final String MAPREDUCE_JOB_TYPE = "phoenix.mapreduce.jobtype";
@@ -283,6 +284,12 @@ public final class PhoenixConfigurationUtil {
configuration.set(INDEX_TOOL_START_TIME, Long.toString(startTime));
}
+ public static void setIndexToolLastVerifyTime(Configuration configuration, Long lastVerifyTime) {
+ Preconditions.checkNotNull(configuration);
+ Preconditions.checkNotNull(lastVerifyTime);
+ configuration.set(INDEX_TOOL_LAST_VERIFY_TIME, Long.toString(lastVerifyTime));
+ }
+
public static void setCurrentScnValue(Configuration configuration, Long scn) {
Preconditions.checkNotNull(configuration);
Preconditions.checkNotNull(scn);
@@ -298,6 +305,11 @@ public final class PhoenixConfigurationUtil {
Preconditions.checkNotNull(configuration);
return configuration.get(CURRENT_SCN_VALUE);
}
+
+ public static String getIndexToolLastVerifyTime(Configuration configuration) {
+ Preconditions.checkNotNull(configuration);
+ return configuration.get(INDEX_TOOL_LAST_VERIFY_TIME);
+ }
public static List<String> getUpsertColumnNames(final Configuration configuration) {
return getValues(configuration, MAPREDUCE_UPSERT_COLUMN_COUNT, MAPREDUCE_UPSERT_COLUMN_VALUE_PREFIX);
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java
index d82e99a..92317d4 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java
@@ -61,7 +61,7 @@ public class IndexToolTest extends BaseTest {
}
@Test
- public void testParseOptions_timeRange_timeRangeNotNull() {
+ public void testParseOptions_timeRange_timeRangeNotNull() throws Exception {
Long startTime = 10L;
Long endTime = 15L;
String [] args =
@@ -75,7 +75,7 @@ public class IndexToolTest extends BaseTest {
}
@Test
- public void testParseOptions_timeRange_null() {
+ public void testParseOptions_timeRange_null() throws Exception {
String [] args =
IndexToolIT.getArgValues(true, true, schema,
dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.NONE);
@@ -86,7 +86,7 @@ public class IndexToolTest extends BaseTest {
}
@Test
- public void testParseOptions_timeRange_startTimeNotNull() {
+ public void testParseOptions_timeRange_startTimeNotNull() throws Exception {
Long startTime = 10L;
String [] args =
IndexToolIT.getArgValues(true, true, schema,
@@ -99,7 +99,7 @@ public class IndexToolTest extends BaseTest {
}
@Test
- public void testParseOptions_timeRange_endTimeNotNull() {
+ public void testParseOptions_timeRange_endTimeNotNull() throws Exception {
Long endTime = 15L;
String [] args =
IndexToolIT.getArgValues(true, true, schema,
@@ -112,7 +112,7 @@ public class IndexToolTest extends BaseTest {
}
@Test
- public void testParseOptions_timeRange_startTimeNullEndTimeInFuture() {
+ public void testParseOptions_timeRange_startTimeNullEndTimeInFuture() throws Exception {
Long endTime = EnvironmentEdgeManager.currentTimeMillis() + 100000;
String [] args =
IndexToolIT.getArgValues(true, true, schema,
@@ -125,7 +125,7 @@ public class IndexToolTest extends BaseTest {
}
@Test
- public void testParseOptions_timeRange_endTimeNullStartTimeInFuture() {
+ public void testParseOptions_timeRange_endTimeNullStartTimeInFuture() throws Exception {
Long startTime = EnvironmentEdgeManager.currentTimeMillis() + 100000;
String [] args =
IndexToolIT.getArgValues(true, true, schema,
@@ -138,7 +138,7 @@ public class IndexToolTest extends BaseTest {
}
@Test(timeout = 10000 /* 10 secs */)
- public void testParseOptions_timeRange_startTimeInFuture() {
+ public void testParseOptions_timeRange_startTimeInFuture() throws Exception {
Long startTime = EnvironmentEdgeManager.currentTimeMillis() + 100000;
Long endTime = EnvironmentEdgeManager.currentTimeMillis() + 200000;
String [] args =
@@ -152,7 +152,7 @@ public class IndexToolTest extends BaseTest {
}
@Test(timeout = 10000 /* 10 secs */)
- public void testParseOptions_timeRange_endTimeInFuture() {
+ public void testParseOptions_timeRange_endTimeInFuture() throws Exception {
Long startTime = EnvironmentEdgeManager.currentTimeMillis();
Long endTime = EnvironmentEdgeManager.currentTimeMillis() + 100000;
String [] args =
@@ -166,7 +166,7 @@ public class IndexToolTest extends BaseTest {
}
@Test
- public void testParseOptions_timeRange_startTimeEqEndTime() {
+ public void testParseOptions_timeRange_startTimeEqEndTime() throws Exception {
Long startTime = 10L;
Long endTime = 10L;
String [] args =
@@ -180,7 +180,7 @@ public class IndexToolTest extends BaseTest {
}
@Test
- public void testParseOptions_timeRange_startTimeGtEndTime() {
+ public void testParseOptions_timeRange_startTimeGtEndTime() throws Exception {
Long startTime = 10L;
Long endTime = 1L;
String [] args =
@@ -198,6 +198,31 @@ public class IndexToolTest extends BaseTest {
when(pDataTable.isTransactional()).thenReturn(true);
exceptionRule.expect(RuntimeException.class);
exceptionRule.expectMessage(FEATURE_NOT_APPLICABLE);
- IndexTool.checkTimeRangeFeature(1L, 3L, pDataTable, !localIndex);
+ IndexTool.checkIfFeatureApplicable(1L, 3L, null, pDataTable, !localIndex);
+ }
+
+ @Test
+ public void testIncrcementalVerifyOption() throws Exception {
+ IndexTool mockTool = Mockito.mock(IndexTool.class);
+ when(mockTool.getLastVerifyTime()).thenCallRealMethod();
+ Long lastVerifyTime = 10L;
+ String [] args =
+ IndexToolIT.getArgValues(true, true, schema,
+ dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.NONE,
+ lastVerifyTime);
+ when(mockTool.parseOptions(args)).thenCallRealMethod();
+
+ CommandLine cmdLine = mockTool.parseOptions(args);
+
+ when(mockTool.populateIndexToolAttributes(cmdLine)).thenCallRealMethod();
+ when(mockTool.isValidLastVerifyTime(lastVerifyTime)).thenReturn(true);
+
+ mockTool.populateIndexToolAttributes(cmdLine);
+ Assert.assertEquals(lastVerifyTime, mockTool.getLastVerifyTime());
+
+ when(pDataTable.isTransactional()).thenReturn(true);
+ exceptionRule.expect(RuntimeException.class);
+ exceptionRule.expectMessage(FEATURE_NOT_APPLICABLE);
+ IndexTool.checkIfFeatureApplicable(null, null, lastVerifyTime, pDataTable, !localIndex);
}
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexUpgradeToolTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexUpgradeToolTest.java
index a87a4e0..0ce54fb 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexUpgradeToolTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexUpgradeToolTest.java
@@ -97,7 +97,7 @@ public class IndexUpgradeToolTest {
}
@Test
- public void testIfOptionsArePassedToIndexTool() {
+ public void testIfOptionsArePassedToIndexTool() throws Exception {
if (!upgrade) {
return;
}
@@ -126,7 +126,7 @@ public class IndexUpgradeToolTest {
}
@Test
- public void testMalformedSpacingOptionsArePassedToIndexTool() {
+ public void testMalformedSpacingOptionsArePassedToIndexTool() throws Exception {
if (!upgrade) {
return;
}
@@ -152,7 +152,7 @@ public class IndexUpgradeToolTest {
}
@Test(expected = IllegalStateException.class)
- public void testBadIndexToolOptions() {
+ public void testBadIndexToolOptions() throws Exception {
String [] indexToolOpts = {"-v" + DUMMY_VERIFY_VALUE};
String indexToolarg = String.join(" ", indexToolOpts);
String [] args = {"-o", UPGRADE_OP, "-tb", INPUT_LIST, "-rb", "-tool", indexToolarg };
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/ShouldVerifyTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/ShouldVerifyTest.java
new file mode 100644
index 0000000..8cc1970
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/ShouldVerifyTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.index;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
+import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
+import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+public class ShouldVerifyTest {
+
+ @Mock IndexRebuildRegionScanner scanner;
+ @Mock IndexMaintainer im;
+ @Mock Scan scan;
+ @Mock Region region;
+ @Mock IndexVerificationResultRepository resultRepository;
+ byte[] indexRowKey;
+ @Mock IndexToolVerificationResult verificationResult;
+
+ @Before
+ public void setup() throws IOException {
+ MockitoAnnotations.initMocks(this);
+ indexRowKey = null;
+ when(im.getIndexTableName()).thenReturn(Bytes.toBytes("indexName"));
+ when(scanner.shouldVerify(any(IndexTool.IndexVerifyType.class), Matchers.<byte[]>any(), any(Scan.class),
+ any(Region.class), any(IndexMaintainer.class),
+ any(IndexVerificationResultRepository.class))).thenCallRealMethod();
+ }
+
+ @Test
+ public void testShouldVerify_repair_true() throws IOException {
+ indexRowKey = new byte[5];
+ Assert.assertTrue(scanner.shouldVerify(IndexTool.IndexVerifyType.ONLY, indexRowKey, scan, region, im, resultRepository));
+ }
+
+ @Test
+ public void testShouldVerify_repair_rebuild_true() throws IOException {
+ indexRowKey = new byte[5];
+ when(scan.getAttribute(UngroupedAggregateRegionObserver.INDEX_RETRY_VERIFY)).thenReturn(Bytes.toBytes(1L));
+ assertShouldVerify(true);
+ }
+
+ private void assertShouldVerify(boolean assertion) throws IOException {
+ Assert.assertEquals(assertion, scanner.shouldVerify(IndexTool.IndexVerifyType.NONE, indexRowKey, scan, region, im, resultRepository));
+ Assert.assertEquals(assertion, scanner.shouldVerify(IndexTool.IndexVerifyType.BEFORE, indexRowKey, scan, region, im, resultRepository));
+ Assert.assertEquals(assertion, scanner.shouldVerify(IndexTool.IndexVerifyType.AFTER, indexRowKey, scan, region, im, resultRepository));
+ }
+
+ @Test
+ public void testShouldVerify_false() throws IOException {
+ when(scan.getAttribute(UngroupedAggregateRegionObserver.INDEX_RETRY_VERIFY)).thenReturn(Bytes.toBytes(1L));
+ when(resultRepository.getVerificationResult(1L, scan, region, im.getIndexTableName())).thenReturn(verificationResult);
+ assertShouldVerify(false);
+ }
+
+ @Test
+ public void testShouldVerify_rebuild_true() throws IOException {
+ when(scan.getAttribute(UngroupedAggregateRegionObserver.INDEX_RETRY_VERIFY)).thenReturn(Bytes.toBytes(1L));
+ when(resultRepository.getVerificationResult(1L, scan, region, im.getIndexTableName())).thenReturn(null);
+ assertShouldVerify(true);
+ }
+
+ @Test
+ public void testShouldVerify_noTime_true() throws IOException {
+ when(resultRepository.getVerificationResult(1L, scan, region, im.getIndexTableName())).thenReturn(verificationResult);
+ assertShouldVerify(true);
+ }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
index f58605f..840e9d5 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
@@ -301,4 +301,15 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
Long.parseLong(PhoenixConfigurationUtil.getCurrentScnValue(configuration)));
}
+
+ @Test
+ public void testLastVerifyTimeConfig() {
+ final Configuration configuration = new Configuration();
+ Long lastVerifyTime = 2L;
+
+ PhoenixConfigurationUtil.setIndexToolLastVerifyTime(configuration, lastVerifyTime);
+ Assert.assertEquals(lastVerifyTime.longValue(),
+ Long.parseLong(PhoenixConfigurationUtil.getIndexToolLastVerifyTime(configuration)));
+
+ }
}