You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2020/01/15 22:41:10 UTC
[phoenix] branch 4.x-HBase-1.5 updated: PHOENIX-5674 IndexTool to
not write already correct index rows/CFs
This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch 4.x-HBase-1.5
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.5 by this push:
new d1e35ca PHOENIX-5674 IndexTool to not write already correct index rows/CFs
d1e35ca is described below
commit d1e35ca2390a263bc182baeeb37cd6dbade5992d
Author: Kadir <ko...@salesforce.com>
AuthorDate: Tue Jan 14 15:47:33 2020 -0800
PHOENIX-5674 IndexTool to not write already correct index rows/CFs
---
.../org/apache/phoenix/end2end/IndexToolIT.java | 80 +++++-
.../coprocessor/IndexRebuildRegionScanner.java | 280 +++++++++++++--------
.../apache/phoenix/mapreduce/index/IndexTool.java | 15 +-
3 files changed, 262 insertions(+), 113 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 0edc3c4..490ecb5 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -37,26 +37,29 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.phoenix.end2end.index.GlobalIndexCheckerIT;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.index.IndexTool;
@@ -377,6 +380,24 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
}
}
+ public static class MutationCountingRegionObserver extends SimpleRegionObserver {
+ public static AtomicInteger mutationCount = new AtomicInteger(0);
+
+ public static void setMutationCount(int value) {
+ mutationCount.set(0);
+ }
+
+ public static int getMutationCount() {
+ return mutationCount.get();
+ }
+
+ @Override
+ public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+ MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
+ mutationCount.addAndGet(miniBatchOp.size());
+ }
+ }
+
private Cell getErrorMessageFromIndexToolOutputTable(Connection conn, String dataTableFullName, String indexTableFullName)
throws Exception {
byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName);
@@ -415,6 +436,53 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
}
@Test
+ public void testIndexToolVerifyBeforeAndBothOptions() throws Exception {
+ // This test is for building non-transactional global indexes with direct api
+ if (localIndex || transactional || !directApi || useSnapshot) {
+ return;
+ }
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ String viewName = generateUniqueName();
+ String viewFullName = SchemaUtil.getTableName(schemaName, viewName);
+ conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
+ + tableDDLOptions);
+ conn.commit();
+ conn.createStatement().execute("CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName);
+ conn.commit();
+ // Insert a row
+ conn.createStatement().execute("upsert into " + viewFullName + " values (1, 'Phoenix', 12345)");
+ conn.commit();
+ conn.createStatement().execute(String.format(
+ "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC", indexTableName, viewFullName));
+ TestUtil.addCoprocessor(conn, "_IDX_" + dataTableFullName, MutationCountingRegionObserver.class);
+ // Run the index MR job and verify that the index table rebuild succeeds
+ runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
+ null, 0, IndexTool.IndexVerifyType.AFTER);
+ assertEquals(1, MutationCountingRegionObserver.getMutationCount());
+ MutationCountingRegionObserver.setMutationCount(0);
+ // Since all the rows are in the index table, running the index tool with the "-v BEFORE" option should
+ // write any index rows
+ runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
+ null, 0, IndexTool.IndexVerifyType.BEFORE);
+ assertEquals(0, MutationCountingRegionObserver.getMutationCount());
+ // The "-v BOTH" option should not write any index rows either
+ runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
+ null, 0, IndexTool.IndexVerifyType.BOTH);
+ assertEquals(0, MutationCountingRegionObserver.getMutationCount());
+ Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+ TableName indexToolOutputTable = TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES);
+ admin.disableTable(indexToolOutputTable);
+ admin.deleteTable(indexToolOutputTable);
+ }
+ }
+
+ @Test
public void testIndexToolVerifyAfterOption() throws Exception {
// This test is for building non-transactional global indexes with direct api
if (localIndex || transactional || !directApi || useSnapshot) {
@@ -447,7 +515,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
null, -1, IndexTool.IndexVerifyType.AFTER);
// The index tool output table should report that there is a missing index row
Cell cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, "_IDX_" + dataTableFullName);
- byte[] expectedValueBytes = Bytes.toBytes("Missing index rows - Expected: 1 Actual: 0");
+ byte[] expectedValueBytes = Bytes.toBytes("Missing index row");
assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
expectedValueBytes, 0, expectedValueBytes.length) == 0);
IndexRegionObserver.setIgnoreIndexRebuildForTesting(false);
@@ -484,7 +552,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
null, 0, IndexTool.IndexVerifyType.ONLY);
Cell cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName);
- byte[] expectedValueBytes = Bytes.toBytes("Missing index rows - Expected: 1 Actual: 0");
+ byte[] expectedValueBytes = Bytes.toBytes("Missing index row");
assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
expectedValueBytes, 0, expectedValueBytes.length) == 0);
// Delete the output table for the next test
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 142871b..b62b215 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -110,8 +110,9 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
private Table outputHTable = null;
private IndexTool.IndexVerifyType verifyType = IndexTool.IndexVerifyType.NONE;
private boolean verify = false;
- private boolean onlyVerify = false;
+ private boolean doNotFail = false;
private Map<byte[], Put> indexKeyToDataPutMap;
+ private Map<byte[], Put> dataKeyToDataPutMap;
private TaskRunner pool;
private TaskBatch<Boolean> tasks;
private String exceptionMessage;
@@ -140,6 +141,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
}
if (!scan.isRaw()) {
+ // No need to deserialize index maintainers when the scan is raw. Raw scan is used by partial rebuilds
List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(indexMetaData, true);
indexMaintainer = maintainers.get(0);
}
@@ -152,15 +154,14 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE);
if (valueBytes != null) {
verifyType = IndexTool.IndexVerifyType.fromValue(valueBytes);
- if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.ONLY) {
+ if (verifyType != IndexTool.IndexVerifyType.NONE) {
verify = true;
- if (verifyType == IndexTool.IndexVerifyType.ONLY) {
- onlyVerify = true;
- }
+ // Create the following objects only for rebuilds by IndexTool
hTableFactory = ServerUtil.getDelegateHTableFactory(env, ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION);
indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
outputHTable = hTableFactory.getTable(new ImmutableBytesPtr(IndexTool.OUTPUT_TABLE_NAME_BYTES));
indexKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ dataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor(
new ThreadPoolBuilder("IndexVerify",
env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY,
@@ -201,28 +202,31 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
m.setDurability(Durability.SKIP_WAL);
}
- private Delete generateDeleteMarkers(List<Cell> row) {
+ private Delete generateDeleteMarkers(Put put) {
Set<ColumnReference> allColumns = indexMaintainer.getAllColumns();
- if (row.size() == allColumns.size() + 1) {
+ int cellCount = put.size();
+ if (cellCount == allColumns.size() + 1) {
// We have all the columns for the index table plus the empty column. So, no delete marker is needed
return null;
}
- Set<ColumnReference> includedColumns = Sets.newLinkedHashSetWithExpectedSize(row.size());
+ Set<ColumnReference> includedColumns = Sets.newLinkedHashSetWithExpectedSize(cellCount);
long ts = 0;
- for (Cell cell : row) {
- includedColumns.add(new ColumnReference(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell)));
- if (ts < cell.getTimestamp()) {
- ts = cell.getTimestamp();
+ for (List<Cell> cells : put.getFamilyCellMap().values()) {
+ if (cells == null) {
+ break;
+ }
+ for (Cell cell : cells) {
+ includedColumns.add(new ColumnReference(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell)));
+ if (ts < cell.getTimestamp()) {
+ ts = cell.getTimestamp();
+ }
}
}
- byte[] rowKey;
Delete del = null;
for (ColumnReference column : allColumns) {
if (!includedColumns.contains(column)) {
if (del == null) {
- Cell cell = row.get(0);
- rowKey = CellUtil.cloneRow(cell);
- del = new Delete(rowKey);
+ del = new Delete(put.getRow());
}
del.addColumns(column.getFamily(), column.getQualifier(), ts);
}
@@ -238,15 +242,12 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
}
}
- private byte[] commitIfReady(byte[] uuidValue) throws IOException {
- if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
+ private byte[] commitIfReady(byte[] uuidValue, UngroupedAggregateRegionObserver.MutationList mutationList) throws IOException {
+ if (ServerUtil.readyToCommit(mutationList.size(), mutationList.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
ungroupedAggregateRegionObserver.checkForRegionClosing();
- ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutations, blockingMemstoreSize);
+ ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutationList, blockingMemstoreSize);
uuidValue = ServerCacheClient.generateId();
- if (verify) {
- addToBeVerifiedIndexRows();
- }
- mutations.clear();
+ mutationList.clear();
}
return uuidValue;
}
@@ -367,10 +368,9 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
return ts;
}
- private void verifySingleIndexRow(Result indexRow, final Put dataRow) throws IOException {
- ValueGetter valueGetter = new SimpleValueGetter(dataRow);
+ private long getMaxTimestamp(Put put) {
long ts = 0;
- for (List<Cell> cells : dataRow.getFamilyCellMap().values()) {
+ for (List<Cell> cells : put.getFamilyCellMap().values()) {
if (cells == null) {
break;
}
@@ -380,19 +380,25 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
}
}
}
+ return ts;
+ }
+
+ private boolean verifySingleIndexRow(Result indexRow, final Put dataRow) throws IOException {
+ ValueGetter valueGetter = new SimpleValueGetter(dataRow);
+ long ts = getMaxTimestamp(dataRow);
Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
valueGetter, new ImmutableBytesWritable(dataRow.getRow()), ts, null, null);
if (indexPut == null) {
// This means the index row does not have any covered columns. We just need to check if the index row
// has only one cell (which is the empty column cell)
if (indexRow.rawCells().length == 1) {
- return;
+ return true;
}
String errorMsg = "Expected to find only empty column cell but got "
+ indexRow.rawCells().length;
logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
- if (onlyVerify) {
- return;
+ if (doNotFail) {
+ return false;
}
exceptionMessage = "Index verify failed - " + errorMsg + indexHTable.getName();
throw new IOException(exceptionMessage);
@@ -418,8 +424,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
String errorMsg = " Missing cell " + Bytes.toString(family) + ":" +
Bytes.toString(qualifier);
logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
- if (onlyVerify) {
- return;
+ if (doNotFail) {
+ return false;
}
exceptionMessage = "Index verify failed - Missing cell " + indexHTable.getName();
throw new IOException(exceptionMessage);
@@ -430,8 +436,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
Bytes.toString(qualifier);
logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow),
errorMsg, CellUtil.cloneValue(expectedCell), CellUtil.cloneValue(actualCell));
- if (onlyVerify) {
- return;
+ if (doNotFail) {
+ return false;
}
exceptionMessage = "Index verify failed - Not matching cell value - " + indexHTable.getName();
throw new IOException(exceptionMessage);
@@ -443,14 +449,16 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
String errorMsg = "Expected to find " + cellCount + " cells but got "
+ indexRow.rawCells().length + " cells";
logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
- if (!onlyVerify) {
+ if (!doNotFail) {
exceptionMessage = "Index verify failed - " + errorMsg + " - " + indexHTable.getName();
throw new IOException(exceptionMessage);
}
+ return false;
}
+ return true;
}
- private void verifyIndexRows(ArrayList<KeyRange> keys) throws IOException {
+ private void verifyIndexRows(ArrayList<KeyRange> keys, Map<byte[], Put> perTaskDataKeyToDataPutMap) throws IOException {
int expectedRowCount = keys.size();
ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
Scan indexScan = new Scan();
@@ -463,31 +471,57 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
for (Result result = resultScanner.next(); (result != null); result = resultScanner.next()) {
Put dataPut = indexKeyToDataPutMap.get(result.getRow());
if (dataPut == null) {
- exceptionMessage = "Index verify failed - Missing data row - " + indexHTable.getName();
String errorMsg = "Missing data row";
logToIndexToolOutputTable(null, result.getRow(), 0, getMaxTimestamp(result), errorMsg);
- if (!onlyVerify) {
+ if (!doNotFail) {
+ exceptionMessage = "Index verify failed - Missing data row - " + indexHTable.getName();
throw new IOException(exceptionMessage);
}
}
- verifySingleIndexRow(result, dataPut);
+ if (verifySingleIndexRow(result, dataPut)) {
+ perTaskDataKeyToDataPutMap.remove(dataPut.getRow());
+ }
rowCount++;
}
} catch (Throwable t) {
ServerUtil.throwIOException(indexHTable.getName().toString(), t);
}
if (rowCount != expectedRowCount) {
- String errorMsg = "Missing index rows - Expected: " + expectedRowCount +
- " Actual: " + rowCount;
+ for (Map.Entry<byte[], Put> entry : perTaskDataKeyToDataPutMap.entrySet()) {
+ String errorMsg = "Missing index row";
+ logToIndexToolOutputTable(entry.getKey(), null, getMaxTimestamp(entry.getValue()),
+ 0, errorMsg);
+ if (!doNotFail) {
exceptionMessage = "Index verify failed - " + errorMsg + " - " + indexHTable.getName();
- logToIndexToolOutputTable(null, null, 0, 0, errorMsg);
- if (!onlyVerify) {
- throw new IOException(exceptionMessage);
+ throw new IOException(exceptionMessage);
+ }
+ }
+ }
+ }
+
+ private void rebuildIndexRows(UngroupedAggregateRegionObserver.MutationList mutationList) throws IOException {
+ byte[] uuidValue = ServerCacheClient.generateId();
+ UngroupedAggregateRegionObserver.MutationList currentMutationList =
+ new UngroupedAggregateRegionObserver.MutationList(maxBatchSize);
+ for (Mutation mutation : mutationList) {
+ Put put = (Put) mutation;
+ currentMutationList.add(mutation);
+ setMutationAttributes(put, uuidValue);
+ uuidValue = commitIfReady(uuidValue, currentMutationList);
+ Delete deleteMarkers = generateDeleteMarkers(put);
+ if (deleteMarkers != null) {
+ setMutationAttributes(deleteMarkers, uuidValue);
+ currentMutationList.add(deleteMarkers);
+ uuidValue = commitIfReady(uuidValue, currentMutationList);
}
}
+ if (!currentMutationList.isEmpty()) {
+ ungroupedAggregateRegionObserver.checkForRegionClosing();
+ ungroupedAggregateRegionObserver.commitBatchWithRetries(region, currentMutationList, blockingMemstoreSize);
+ }
}
- private void addVerifyTask(final ArrayList<KeyRange> keys) {
+ private void addVerifyTask(final ArrayList<KeyRange> keys, final Map<byte[], Put> perTaskDataKeyToDataPutMap) {
tasks.add(new Task<Boolean>() {
@Override
public Boolean call() throws Exception {
@@ -496,7 +530,13 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
exceptionMessage = "Pool closed, not attempting to verify index rows! " + indexHTable.getName();
throw new IOException(exceptionMessage);
}
- verifyIndexRows(keys);
+ verifyIndexRows(keys, perTaskDataKeyToDataPutMap);
+ if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) {
+ synchronized (dataKeyToDataPutMap) {
+ dataKeyToDataPutMap.putAll(perTaskDataKeyToDataPutMap);
+ }
+ }
+ perTaskDataKeyToDataPutMap.clear();
} catch (Exception e) {
throw e;
}
@@ -505,11 +545,81 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
});
}
+ private void parallelizeIndexVerify() throws IOException {
+ addToBeVerifiedIndexRows();
+ ArrayList<KeyRange> keys = new ArrayList<>(rowCountPerTask);
+ Map<byte[], Put> perTaskDataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ for (Map.Entry<byte[], Put> entry: indexKeyToDataPutMap.entrySet()) {
+ keys.add(PVarbinary.INSTANCE.getKeyRange(entry.getKey()));
+ perTaskDataKeyToDataPutMap.put(entry.getValue().getRow(), entry.getValue());
+ if (keys.size() == rowCountPerTask) {
+ addVerifyTask(keys, perTaskDataKeyToDataPutMap);
+ keys = new ArrayList<>(rowCountPerTask);
+ perTaskDataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ }
+ }
+ if (keys.size() > 0) {
+ addVerifyTask(keys, perTaskDataKeyToDataPutMap);
+ }
+ List<Boolean> taskResultList = null;
+ try {
+ LOGGER.debug("Waiting on index verify tasks to complete...");
+ taskResultList = this.pool.submitUninterruptible(tasks);
+ } catch (ExecutionException e) {
+ throw new RuntimeException("Should not fail on the results while using a WaitForCompletionTaskRunner", e);
+ } catch (EarlyExitFailure e) {
+ throw new RuntimeException("Stopped while waiting for batch, quitting!", e);
+ } finally {
+ tasks.getTasks().clear();
+ }
+ for (Boolean result : taskResultList) {
+ if (result == null) {
+ // there was a failure
+ throw new IOException(exceptionMessage);
+ }
+ }
+ }
+
+ private void verifyAndOrRebuildIndex() throws IOException {
+ if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.NONE) {
+ // For these options we start with rebuilding index rows
+ rebuildIndexRows(mutations);
+ }
+ if (verifyType == IndexTool.IndexVerifyType.NONE) {
+ return;
+ }
+ if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH ||
+ verifyType == IndexTool.IndexVerifyType.ONLY) {
+ // For these options we start with verifying index rows
+ doNotFail = true; // Don't stop at the first mismatch
+ parallelizeIndexVerify();
+ }
+ if (verifyType == IndexTool.IndexVerifyType.BEFORE || verifyType == IndexTool.IndexVerifyType.BOTH) {
+ // For these options, we have identified the rows to be rebuilt and now need to rebuild them
+ // At this point, dataKeyToDataPutMap includes mapping only for the rows to be rebuilt
+ mutations.clear();
+ for (Map.Entry<byte[], Put> entry: dataKeyToDataPutMap.entrySet()) {
+ mutations.add(entry.getValue());
+ }
+ rebuildIndexRows(mutations);
+ }
+
+ if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) {
+ // We have rebuilt index row and now we need to verify them
+ doNotFail = false; // Stop at the first mismatch
+ indexKeyToDataPutMap.clear();
+ parallelizeIndexVerify();
+ }
+ indexKeyToDataPutMap.clear();
+ }
+
@Override
public boolean next(List<Cell> results) throws IOException {
int rowCount = 0;
region.startRegionOperation();
try {
+ // Partial rebuilds by MetadataRegionObserver use raw scan. Inline verification is not supported for them
+ boolean partialRebuild = scan.isRaw();
byte[] uuidValue = ServerCacheClient.generateId();
synchronized (innerScanner) {
do {
@@ -522,55 +632,58 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
if (put == null) {
put = new Put(CellUtil.cloneRow(cell));
- setMutationAttributes(put, uuidValue);
mutations.add(put);
}
put.add(cell);
} else {
if (del == null) {
del = new Delete(CellUtil.cloneRow(cell));
- setMutationAttributes(del, uuidValue);
mutations.add(del);
}
del.addDeleteMarker(cell);
}
}
- if (onlyVerify) {
- rowCount++;
- continue;
+ if (partialRebuild) {
+ if (put != null) {
+ setMutationAttributes(put, uuidValue);
+ }
+ if (del != null) {
+ setMutationAttributes(del, uuidValue);
+ }
+ uuidValue = commitIfReady(uuidValue, mutations);
}
- uuidValue = commitIfReady(uuidValue);
- if (!scan.isRaw()) {
- Delete deleteMarkers = generateDeleteMarkers(row);
+ if (indexRowKey != null) {
+ if (put != null) {
+ setMutationAttributes(put, uuidValue);
+ }
+ Delete deleteMarkers = generateDeleteMarkers(put);
if (deleteMarkers != null) {
setMutationAttributes(deleteMarkers, uuidValue);
mutations.add(deleteMarkers);
- uuidValue = commitIfReady(uuidValue);
+ uuidValue = commitIfReady(uuidValue, mutations);
}
- }
- if (indexRowKey != null) {
// GlobalIndexChecker passed the index row key. This is to build a single index row.
// Check if the data table row we have just scanned matches with the index row key.
// If not, there is no need to build the index row from this data table row,
// and just return zero row count.
if (checkIndexRow(indexRowKey, put)) {
rowCount = GlobalIndexChecker.RebuildReturnCode.INDEX_ROW_EXISTS.getValue();
- }
- else {
+ } else {
rowCount = GlobalIndexChecker.RebuildReturnCode.NO_INDEX_ROW.getValue();
}
break;
}
rowCount++;
}
-
} while (hasMore && rowCount < pageSizeInRows);
- if (!mutations.isEmpty() && !onlyVerify) {
+ }
+ if (!partialRebuild && indexRowKey == null) {
+ verifyAndOrRebuildIndex();
+ }
+ else {
+ if (!mutations.isEmpty()) {
ungroupedAggregateRegionObserver.checkForRegionClosing();
ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutations, blockingMemstoreSize);
- if (verify) {
- addToBeVerifiedIndexRows();
- }
}
}
} catch (IOException e) {
@@ -579,42 +692,13 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
throw e;
} finally {
region.closeRegionOperation();
- }
- if (verify) {
- if (onlyVerify) {
- addToBeVerifiedIndexRows();
- }
- ArrayList<KeyRange> keys = new ArrayList<>(rowCountPerTask);
- for (byte[] key : indexKeyToDataPutMap.keySet()) {
- keys.add(PVarbinary.INSTANCE.getKeyRange(key));
- if (keys.size() == rowCountPerTask) {
- addVerifyTask(keys);
- keys = new ArrayList<>(rowCountPerTask);
- }
- }
- if (keys.size() > 0) {
- addVerifyTask(keys);
- }
- List<Boolean> taskResultList = null;
- try {
- LOGGER.debug("Waiting on index verify tasks to complete...");
- taskResultList = this.pool.submitUninterruptible(tasks);
- } catch (ExecutionException e) {
- throw new RuntimeException("Should not fail on the results while using a WaitForCompletionTaskRunner", e);
- } catch (EarlyExitFailure e) {
- throw new RuntimeException("Stopped while waiting for batch, quitting!", e);
- }
- finally {
- indexKeyToDataPutMap.clear();
- tasks.getTasks().clear();
- }
- for (Boolean result : taskResultList) {
- if (result == null) {
- // there was a failure
- throw new IOException(exceptionMessage);
- }
+ mutations.clear();
+ if (verify) {
+ indexKeyToDataPutMap.clear();
+ dataKeyToDataPutMap.clear();
}
}
+
byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
final Cell aggKeyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
@@ -626,4 +710,4 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
public long getMaxResultSize() {
return scan.getMaxResultSize();
}
-}
\ No newline at end of file
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 6b77ed8..51cb4b4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -203,11 +203,12 @@ public class IndexTool extends Configured implements Tool {
"This parameter is deprecated. Direct mode will be used whether it is set or not. Keeping it for backwards compatibility.");
private static final Option VERIFY_OPTION = new Option("v", "verify", true,
- "To verify every data row has a corresponding row. The accepted values are NONE, ONLY, BEFORE," +
- " AFTER, and BOTH. NONE is for no inline verification, which is also the default for this option. " +
- "ONLY is for verifying without rebuilding index rows. The rest for verifying before, after, and " +
- "both before and after rebuilding row. If the verification is done before rebuilding rows and " +
- "the correct index rows are not rebuilt. Currently supported values are NONE, ONLY and AFTER ");
+ "To verify every data row has a corresponding row of a global index. For other types of indexes, " +
+ "this option will be silently ignored. The accepted values are NONE, ONLY, BEFORE, AFTER, and BOTH. " +
+ "NONE is for no inline verification, which is also the default for this option. ONLY is for " +
+ "verifying without rebuilding index rows. The rest for verifying before, after, and both before " +
+ "and after rebuilding row. If the verification is done before rebuilding rows and the correct " +
+ "index rows will not be rebuilt");
private static final double DEFAULT_SPLIT_SAMPLING_RATE = 10.0;
@@ -677,10 +678,6 @@ public class IndexTool extends Configured implements Tool {
if (cmdLine.hasOption(VERIFY_OPTION.getOpt())) {
String value = cmdLine.getOptionValue(VERIFY_OPTION.getOpt());
indexVerifyType = IndexVerifyType.fromValue(value);
- if (!(indexVerifyType == IndexVerifyType.NONE || indexVerifyType == IndexVerifyType.AFTER ||
- indexVerifyType == IndexVerifyType.ONLY)) {
- throw new IllegalStateException("Unsupported value for the verify option");
- }
}
qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
try(Connection tempConn = ConnectionUtil.getInputConnection(configuration)) {