You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2020/01/13 03:14:07 UTC
[phoenix] branch master updated: PHOENIX-5658 IndexTool to verify
index rows inline (addendum - restructured the code)
This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 6eebf68 PHOENIX-5658 IndexTool to verify index rows inline (addendum - restructured the code)
6eebf68 is described below
commit 6eebf684513c6ed7391a4bd0c560ccb0e5367cb9
Author: Kadir <ko...@salesforce.com>
AuthorDate: Sun Jan 12 19:14:36 2020 -0800
PHOENIX-5658 IndexTool to verify index rows inline (addendum - restructured the code)
---
.../org/apache/phoenix/end2end/IndexToolIT.java | 38 +++----
.../coprocessor/BaseScannerRegionObserver.java | 7 +-
.../coprocessor/IndexRebuildRegionScanner.java | 114 ++++++++++++++-------
.../UngroupedAggregateRegionObserver.java | 8 +-
.../PhoenixServerBuildIndexInputFormat.java | 11 +-
.../apache/phoenix/mapreduce/index/IndexTool.java | 70 ++++++++++---
.../mapreduce/util/PhoenixConfigurationUtil.java | 15 ++-
7 files changed, 168 insertions(+), 95 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 0558c40..d5713b6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -409,7 +409,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
}
@Test
- public void testIndexToolVerifyOption() throws Exception {
+ public void testIndexToolVerifyAfterOption() throws Exception {
// This test is for building non-transactional global indexes with direct api
if (localIndex || transactional || !directApi || useSnapshot) {
return;
@@ -438,7 +438,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
"CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC", indexTableName, viewFullName));
// Run the index MR job and verify that the index table rebuild fails
runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
- null, -1, true, false);
+ null, -1, IndexTool.IndexVerifyType.AFTER);
// The index tool output table should report that there is a missing index row
Cell cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, "_IDX_" + dataTableFullName);
byte[] expectedValueBytes = Bytes.toBytes("Missing index rows - Expected: 1 Actual: 0");
@@ -476,7 +476,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
// IndexTool will go through each data table row and record the mismatches in the output table
// called PHOENIX_INDEX_TOOL
runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
- null, 0, false, true);
+ null, 0, IndexTool.IndexVerifyType.ONLY);
Cell cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName);
byte[] expectedValueBytes = Bytes.toBytes("Missing index rows - Expected: 1 Actual: 0");
assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
@@ -488,15 +488,15 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
admin.deleteTable(indexToolOutputTable);
// Run the index tool to populate the index while verifying rows
runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
- null, 0, true, false);
+ null, 0, IndexTool.IndexVerifyType.AFTER);
// Corrupt one cell by writing directly into the index table
conn.createStatement().execute("upsert into " + indexTableFullName + " values ('Phoenix', 1, 'B')");
conn.commit();
// Run the index tool using the only-verify option to detect this mismatch between the data and index table
runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
- null, 0, false, true);
+ null, 0, IndexTool.IndexVerifyType.ONLY);
cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName);
- expectedValueBytes = Bytes.toBytes("Not matching cell value - 0:0:CODE - Expected: A - Actual: B");
+ expectedValueBytes = Bytes.toBytes("Not matching value for 0:0:CODE E:A A:B");
assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
expectedValueBytes, 0, expectedValueBytes.length) == 0);
admin.disableTable(indexToolOutputTable);
@@ -780,7 +780,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
private static List<String> getArgList (boolean directApi, boolean useSnapshot, String schemaName,
String dataTable, String indxTable, String tenantId,
- boolean verify, boolean onlyVerify) {
+ IndexTool.IndexVerifyType verifyType) {
List<String> args = Lists.newArrayList();
if (schemaName != null) {
args.add("-s");
@@ -793,11 +793,8 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
if (directApi) {
args.add("-direct");
}
- if (verify) {
- args.add("-v"); // verify index rows inline
- } else if (onlyVerify) {
- args.add("-ov");
- }
+ args.add("-v" + verifyType.getValue()); // verify index rows inline
+
// Need to run this job in foreground for the test to be deterministic
args.add("-runfg");
if (useSnapshot) {
@@ -815,9 +812,8 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
}
public static String[] getArgValues(boolean directApi, boolean useSnapshot, String schemaName,
- String dataTable, String indexTable, String tenantId, boolean verify, boolean onlyVerify) {
- List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable, tenantId, verify, onlyVerify);
-
+ String dataTable, String indexTable, String tenantId, IndexTool.IndexVerifyType verifyType) {
+ List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable, tenantId, verifyType);
return args.toArray(new String[0]);
}
@@ -856,27 +852,27 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
}
public static void runIndexTool(boolean directApi, boolean useSnapshot, String schemaName,
- String dataTableName, String indexTableName, String... additionalArgs) throws Exception {
+ String dataTableName, String indexTableName, String... additionalArgs) throws Exception {
runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, additionalArgs);
}
public static IndexTool runIndexTool(boolean directApi, boolean useSnapshot, String schemaName,
- String dataTableName, String indexTableName, String tenantId, int expectedStatus,
- String... additionalArgs) throws Exception {
+ String dataTableName, String indexTableName, String tenantId, int expectedStatus,
+ String... additionalArgs) throws Exception {
return runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, tenantId, expectedStatus,
- false, false, additionalArgs);
+ IndexTool.IndexVerifyType.NONE, additionalArgs);
}
public static IndexTool runIndexTool(boolean directApi, boolean useSnapshot, String schemaName,
String dataTableName, String indexTableName, String tenantId,
- int expectedStatus, boolean verify, boolean onlyVerify,
+ int expectedStatus, IndexTool.IndexVerifyType verifyType,
String... additionalArgs) throws Exception {
IndexTool indexingTool = new IndexTool();
Configuration conf = new Configuration(getUtility().getConfiguration());
conf.set(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
indexingTool.setConf(conf);
final String[] cmdArgs = getArgValues(directApi, useSnapshot, schemaName, dataTableName,
- indexTableName, tenantId, verify, onlyVerify);
+ indexTableName, tenantId, verifyType);
List<String> cmdArgList = new ArrayList<>(Arrays.asList(cmdArgs));
cmdArgList.addAll(Arrays.asList(additionalArgs));
int status = indexingTool.run(cmdArgList.toArray(new String[cmdArgList.size()]));
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 9e5b2eb..272e1fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -69,10 +69,9 @@ abstract public class BaseScannerRegionObserver implements RegionObserver {
public static final String LOCAL_INDEX_BUILD = "_LocalIndexBuild";
// The number of index rows to be rebuild in one RPC call
public static final String INDEX_REBUILD_PAGING = "_IndexRebuildPaging";
- // Verify index rows that have been rebuilt inline
- public static final String INDEX_REBUILD_VERIFY = "_IndexRebuildVerify";
- // Verify index rows without rebuilding them
- public static final String INDEX_REBUILD_ONLY_VERIFY = "_IndexRebuildOnlyVerify";
+ // Index verification type done by the index tool
+ public static final String INDEX_REBUILD_VERIFY_TYPE = "_IndexRebuildVerifyType";
+
/*
* Attribute to denote that the index maintainer has been serialized using its proto-buf presentation.
* Needed for backward compatibility purposes. TODO: get rid of this in next major release.
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 1194312..60271ef 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -106,6 +106,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
private IndexMaintainer indexMaintainer;
private byte[] indexRowKey = null;
private Table indexHTable = null;
+ private Table outputHTable = null;
+ private IndexTool.IndexVerifyType verifyType = IndexTool.IndexVerifyType.NONE;
private boolean verify = false;
private boolean onlyVerify = false;
private Map<byte[], Put> indexKeyToDataPutMap;
@@ -145,23 +147,28 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
this.env = env;
this.ungroupedAggregateRegionObserver = ungroupedAggregateRegionObserver;
indexRowKey = scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY);
- if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY) != null ||
- scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_ONLY_VERIFY) != null) {
- verify = true;
- if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_ONLY_VERIFY) != null) {
- onlyVerify = true;
+ byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE);
+ if (valueBytes != null) {
+ verifyType = IndexTool.IndexVerifyType.fromValue(valueBytes);
+ if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.ONLY) {
+ verify = true;
+ if (verifyType == IndexTool.IndexVerifyType.ONLY) {
+ onlyVerify = true;
+ }
+ indexHTable = ServerUtil.ConnectionFactory.getConnection(ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION,
+ env).getTable(TableName.valueOf(indexMaintainer.getIndexTableName()));
+ outputHTable = ServerUtil.ConnectionFactory.getConnection(ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION,
+ env).getTable(TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES));
+ indexKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor(
+ new ThreadPoolBuilder("IndexVerify",
+ env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY,
+ DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS).setCoreTimeout(
+ INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env));
+ tasks = new TaskBatch<>(DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS);
+ rowCountPerTask = config.getInt(INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY,
+ DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK);
}
- indexHTable = ServerUtil.ConnectionFactory.getConnection(ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION,
- env).getTable(TableName.valueOf(indexMaintainer.getIndexTableName()));
- indexKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
- pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor(
- new ThreadPoolBuilder("IndexVerify",
- env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY,
- DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS).setCoreTimeout(
- INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env));
- tasks = new TaskBatch<>(DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS);
- rowCountPerTask = config.getInt(INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY,
- DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK);
}
}
@@ -178,6 +185,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
innerScanner.close();
if (verify) {
this.pool.stop("IndexRebuildRegionScanner is closing");
+ indexHTable.close();
+ outputHTable.close();
}
}
@@ -230,7 +239,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
private byte[] commitIfReady(byte[] uuidValue) throws IOException {
if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
- ungroupedAggregateRegionObserver.checkForRegionClosing();
+ ungroupedAggregateRegionObserver.checkForRegionClosingOrSplitting();
ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutations, blockingMemstoreSize);
uuidValue = ServerCacheClient.generateId();
if (verify) {
@@ -283,15 +292,27 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
private void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
String errorMsg) {
- try (Table hTable = ServerUtil.ConnectionFactory.getConnection(ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION,
- env).getTable(TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME))) {
+ logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs,
+ errorMsg, null, null);
+
+ }
+
+ private void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
+ String errorMsg, byte[] expectedValue, byte[] actualValue) {
+ final byte[] E_VALUE_PREFIX_BYTES = Bytes.toBytes(" E:");
+ final byte[] A_VALUE_PREFIX_BYTES = Bytes.toBytes(" A:");
+ final int PREFIX_LENGTH = 3;
+ final int TOTAL_PREFIX_LENGTH = 6;
+
+ try {
+ int longLength = Long.SIZE / Byte.SIZE;
byte[] rowKey;
if (dataRowKey != null) {
- rowKey = new byte[Long.BYTES + dataRowKey.length];
+ rowKey = new byte[longLength + dataRowKey.length];
Bytes.putLong(rowKey, 0, scan.getTimeRange().getMax());
- Bytes.putBytes(rowKey, Long.BYTES, dataRowKey, 0, dataRowKey.length);
+ Bytes.putBytes(rowKey, longLength, dataRowKey, 0, dataRowKey.length);
} else {
- rowKey = new byte[Long.BYTES];
+ rowKey = new byte[longLength];
Bytes.putLong(rowKey, 0, scan.getTimeRange().getMax());
}
Put put = new Put(rowKey);
@@ -310,9 +331,26 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_TS_BYTES,
scanMaxTs, Bytes.toBytes(indexRowTs));
}
- put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.ERROR_MESSAGE_BYTES,
- scanMaxTs, Bytes.toBytes(errorMsg));
- hTable.put(put);
+ byte[] errorMessageBytes;
+ if (expectedValue != null) {
+ errorMessageBytes = new byte[errorMsg.length() + expectedValue.length + actualValue.length +
+ TOTAL_PREFIX_LENGTH];
+ Bytes.putBytes(errorMessageBytes, 0, Bytes.toBytes(errorMsg), 0, errorMsg.length());
+ int length = errorMsg.length();
+ Bytes.putBytes(errorMessageBytes, length, E_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH);
+ length += PREFIX_LENGTH;
+ Bytes.putBytes(errorMessageBytes, length, expectedValue, 0, expectedValue.length);
+ length += expectedValue.length;
+ Bytes.putBytes(errorMessageBytes, length, A_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH);
+ length += PREFIX_LENGTH;
+ Bytes.putBytes(errorMessageBytes, length, actualValue, 0, actualValue.length);
+
+ }
+ else {
+ errorMessageBytes = Bytes.toBytes(errorMsg);
+ }
+ put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.ERROR_MESSAGE_BYTES, scanMaxTs, errorMessageBytes);
+ outputHTable.put(put);
} catch (IOException e) {
exceptionMessage = "LogToIndexToolOutputTable failed " + e;
}
@@ -351,8 +389,11 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
}
String errorMsg = "Expected to find only empty column cell but got "
+ indexRow.rawCells().length;
- exceptionMessage = "Index verify failed - " + errorMsg + indexHTable.getName();
logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
+ if (onlyVerify) {
+ return;
+ }
+ exceptionMessage = "Index verify failed - " + errorMsg + indexHTable.getName();
throw new IOException(exceptionMessage);
}
else {
@@ -373,26 +414,25 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
byte[] qualifier = CellUtil.cloneQualifier(expectedCell);
Cell actualCell = indexRow.getColumnLatestCell(family, qualifier);
if (actualCell == null) {
- exceptionMessage = "Index verify failed - Missing cell " + indexHTable.getName();
- String errorMsg = " Missing cell - " + Bytes.toStringBinary(family) + ":" +
- Bytes.toStringBinary(qualifier);
+ String errorMsg = " Missing cell " + Bytes.toString(family) + ":" +
+ Bytes.toString(qualifier);
logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
if (onlyVerify) {
return;
}
+ exceptionMessage = "Index verify failed - Missing cell " + indexHTable.getName();
throw new IOException(exceptionMessage);
}
// Check all columns
if (!CellUtil.matchingValue(actualCell, expectedCell)) {
- exceptionMessage = "Index verify failed - Not matching cell value - " + indexHTable.getName();
- String errorMsg = "Not matching cell value - " + Bytes.toStringBinary(family) + ":" +
- Bytes.toStringBinary(qualifier) + " - Expected: " +
- Bytes.toStringBinary(CellUtil.cloneValue(expectedCell)) + " - Actual: " +
- Bytes.toStringBinary(CellUtil.cloneValue(actualCell));
- logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
+ String errorMsg = "Not matching value for " + Bytes.toString(family) + ":" +
+ Bytes.toString(qualifier);
+ logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow),
+ errorMsg, CellUtil.cloneValue(expectedCell), CellUtil.cloneValue(actualCell));
if (onlyVerify) {
return;
}
+ exceptionMessage = "Index verify failed - Not matching cell value - " + indexHTable.getName();
throw new IOException(exceptionMessage);
}
cellCount++;
@@ -401,9 +441,9 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
if (cellCount != indexRow.rawCells().length) {
String errorMsg = "Expected to find " + cellCount + " cells but got "
+ indexRow.rawCells().length + " cells";
- exceptionMessage = "Index verify failed - " + errorMsg + " - " + indexHTable.getName();
logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
if (!onlyVerify) {
+ exceptionMessage = "Index verify failed - " + errorMsg + " - " + indexHTable.getName();
throw new IOException(exceptionMessage);
}
}
@@ -525,7 +565,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
} while (hasMore && rowCount < pageSizeInRows);
if (!mutations.isEmpty() && !onlyVerify) {
- ungroupedAggregateRegionObserver.checkForRegionClosing();
+ ungroupedAggregateRegionObserver.checkForRegionClosingOrSplitting();
ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutations, blockingMemstoreSize);
if (verify) {
addToBeVerifiedIndexRows();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 9067dd0..f4d3ca5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -24,7 +24,6 @@ import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
-import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
@@ -53,7 +52,6 @@ import javax.annotation.concurrent.GuardedBy;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
@@ -95,7 +93,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.cache.GlobalCache;
-import org.apache.phoenix.cache.ServerCacheClient;
import org.apache.phoenix.cache.TenantCache;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.coprocessor.generated.PTableProtos;
@@ -122,7 +119,6 @@ import org.apache.phoenix.hbase.index.table.HTableFactory;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
-import org.apache.phoenix.index.GlobalIndexChecker;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
@@ -306,7 +302,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
for (int i = 0; blockingMemstoreSize > 0 && (region.getMemStoreHeapSize() + region.getMemStoreOffHeapSize()) > blockingMemstoreSize
&& i < 30; i++) {
try {
- checkForRegionClosing();
+ checkForRegionClosingOrSplitting();
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -357,7 +353,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
* a high chance that flush might not proceed and memstore won't be freed up.
* @throws IOException
*/
- public void checkForRegionClosing() throws IOException {
+ public void checkForRegionClosingOrSplitting() throws IOException {
synchronized (lock) {
if(isRegionClosingOrSplitting) {
lock.notifyAll();
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
index 8fb0eb1..b29344f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.phoenix.compile.*;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
-import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
@@ -42,8 +41,6 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolDataTableName;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolIndexTableName;
-import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getOnlyVerifyIndex;
-import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getVerifyIndex;
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
/**
@@ -94,12 +91,8 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph
try {
scan.setTimeRange(0, scn);
scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING, TRUE_BYTES);
- if (getVerifyIndex(configuration)) {
- scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY, TRUE_BYTES);
- }
- else if (getOnlyVerifyIndex(configuration)) {
- scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_ONLY_VERIFY, TRUE_BYTES);
- }
+ scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE,
+ PhoenixConfigurationUtil.getIndexVerifyType(configuration).toBytes());
} catch (IOException e) {
throw new SQLException(e);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index a8823af..0155602 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -97,6 +97,7 @@ import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.ColumnInfo;
import org.apache.phoenix.util.EquiDepthStreamHistogram;
@@ -117,6 +118,42 @@ import com.google.common.collect.Lists;
*
*/
public class IndexTool extends Configured implements Tool {
+ public enum IndexVerifyType {
+ BEFORE("BEFORE"),
+ AFTER("AFTER"),
+ BOTH("BOTH"),
+ ONLY("ONLY"),
+ NONE("NONE");
+ private String value;
+ private byte[] valueBytes;
+
+ private IndexVerifyType(String value) {
+ this.value = value;
+ this.valueBytes = PVarchar.INSTANCE.toBytes(value);
+ }
+
+ public String getValue() {
+ return this.value;
+ }
+
+ public byte[] toBytes() {
+ return this.valueBytes;
+ }
+
+ public static IndexVerifyType fromValue(String value) {
+ for (IndexVerifyType verifyType: IndexVerifyType.values()) {
+ if (value.equals(verifyType.getValue())) {
+ return verifyType;
+ }
+ }
+ throw new IllegalStateException("Invalid value: "+ value + " for " + IndexVerifyType.class);
+ }
+
+ public static IndexVerifyType fromValue(byte[] value) {
+ return fromValue(Bytes.toString(value));
+ }
+ }
+
public final static String OUTPUT_TABLE_NAME = "PHOENIX_INDEX_TOOL";
public final static byte[] OUTPUT_TABLE_NAME_BYTES = Bytes.toBytes(OUTPUT_TABLE_NAME);
public final static byte[] OUTPUT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
@@ -141,8 +178,7 @@ public class IndexTool extends Configured implements Tool {
private String dataTable;
private String indexTable;
private boolean isPartialBuild;
- private boolean verify;
- private boolean onlyVerify;
+ private IndexVerifyType indexVerifyType = IndexVerifyType.NONE;
private String qDataTable;
private String qIndexTable;
private boolean useSnapshot;
@@ -168,12 +204,12 @@ public class IndexTool extends Configured implements Tool {
private static final Option DIRECT_API_OPTION = new Option("direct", "direct", false,
"This parameter is deprecated. Direct mode will be used whether it is set or not. Keeping it for backwards compatibility.");
- private static final Option VERIFY_OPTION = new Option("v", "verify", false,
- "To verify every index row is rebuilt correctly");
-
- private static final Option ONLY_VERIFY_OPTION = new Option("ov", "only-verify", false,
- "To verify every data table row has the corresponding index row with the correct content " +
- "(without building the index table). If the verify option is set then the only-verify option will be ignored");
+ private static final Option VERIFY_OPTION = new Option("v", "verify", true,
+ "To verify every data row has a corresponding row. The accepted values are NONE, ONLY, BEFORE," +
+ " AFTER, and BOTH. NONE is for no inline verification, which is also the default for this option. " +
+ "ONLY is for verifying without rebuilding index rows. The rest for verifying before, after, and " +
+ "both before and after rebuilding row. If the verification is done before rebuilding rows and " +
+ "the correct index rows are not rebuilt. Currently supported values are NONE, ONLY and AFTER ");
private static final double DEFAULT_SPLIT_SAMPLING_RATE = 10.0;
@@ -221,7 +257,6 @@ public class IndexTool extends Configured implements Tool {
options.addOption(PARTIAL_REBUILD_OPTION);
options.addOption(DIRECT_API_OPTION);
options.addOption(VERIFY_OPTION);
- options.addOption(ONLY_VERIFY_OPTION);
options.addOption(RUN_FOREGROUND_OPTION);
options.addOption(OUTPUT_PATH_OPTION);
options.addOption(SNAPSHOT_OPTION);
@@ -542,12 +577,7 @@ public class IndexTool extends Configured implements Tool {
PhoenixConfigurationUtil.setIndexToolDataTableName(configuration, qDataTable);
PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, qIndexTable);
- if (verify) {
- PhoenixConfigurationUtil.setVerifyIndex(configuration, true);
- }
- else if (onlyVerify) {
- PhoenixConfigurationUtil.setOnlyVerifyIndex(configuration, true);
- }
+ PhoenixConfigurationUtil.setIndexVerifyType(configuration, indexVerifyType);
String physicalIndexTable = pIndexTable.getPhysicalName().getString();
PhoenixConfigurationUtil.setPhysicalTableName(configuration, physicalIndexTable);
@@ -650,8 +680,14 @@ public class IndexTool extends Configured implements Tool {
dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
isPartialBuild = cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt());
- verify = cmdLine.hasOption(VERIFY_OPTION.getOpt());
- onlyVerify = cmdLine.hasOption(ONLY_VERIFY_OPTION.getOpt());
+ if (cmdLine.hasOption(VERIFY_OPTION.getOpt())) {
+ String value = cmdLine.getOptionValue(VERIFY_OPTION.getOpt());
+ indexVerifyType = IndexVerifyType.fromValue(value);
+ if (!(indexVerifyType == IndexVerifyType.NONE || indexVerifyType == IndexVerifyType.AFTER ||
+ indexVerifyType == IndexVerifyType.ONLY)) {
+ throw new IllegalStateException("Unsupported value for the verify option");
+ }
+ }
qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
try(Connection tempConn = ConnectionUtil.getInputConnection(configuration)) {
pDataTable = PhoenixRuntime.getTableNoCache(tempConn, qDataTable);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index c040840..196c842 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -43,9 +43,9 @@ import org.apache.phoenix.iterate.BaseResultIterators;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.FormatToBytesWritableMapper;
import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor;
-import org.apache.phoenix.mapreduce.PhoenixInputFormat;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
+import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PTable;
@@ -151,6 +151,8 @@ public final class PhoenixConfigurationUtil {
public static final String ONLY_VERIFY_INDEX = "phoenix.mr.index.onlyVerifyIndex";
+ public static final String INDEX_VERIFY_TYPE = "phoenix.mr.index.IndexVerifyType";
+
// Generate splits based on scans from stats, or just from region splits
public static final String MAPREDUCE_SPLIT_BY_STATS = "phoenix.mapreduce.split.by.stats";
@@ -576,6 +578,11 @@ public final class PhoenixConfigurationUtil {
configuration.setBoolean(ONLY_VERIFY_INDEX, verify);
}
+ public static void setIndexVerifyType(Configuration configuration, IndexTool.IndexVerifyType verifyType) {
+ Preconditions.checkNotNull(configuration);
+ configuration.set(INDEX_VERIFY_TYPE, verifyType.getValue());
+ }
+
public static String getScrutinyDataTableName(Configuration configuration) {
Preconditions.checkNotNull(configuration);
return configuration.get(SCRUTINY_DATA_TABLE_NAME);
@@ -711,6 +718,12 @@ public final class PhoenixConfigurationUtil {
return configuration.getBoolean(ONLY_VERIFY_INDEX, false);
}
+ public static IndexTool.IndexVerifyType getIndexVerifyType(Configuration configuration) {
+ Preconditions.checkNotNull(configuration);
+ String value = configuration.get(INDEX_VERIFY_TYPE, IndexTool.IndexVerifyType.NONE.getValue());
+ return IndexTool.IndexVerifyType.fromValue(value);
+ }
+
public static boolean getSplitByStats(final Configuration configuration) {
Preconditions.checkNotNull(configuration);
boolean split = configuration.getBoolean(MAPREDUCE_SPLIT_BY_STATS, DEFAULT_SPLIT_BY_STATS);