You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by vj...@apache.org on 2021/07/23 15:16:01 UTC
[phoenix] branch 5.1 updated: PHOENIX-6476 Index tool when
verifying from index to data doesn't correctly split page into tasks
(#1240) (#1248)
This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.1 by this push:
new dc79b58 PHOENIX-6476 Index tool when verifying from index to data doesn't correctly split page into tasks (#1240) (#1248)
dc79b58 is described below
commit dc79b58ac6c1a321b6eeecc9f3946df66f75226d
Author: tkhurana <kh...@gmail.com>
AuthorDate: Wed Jun 16 15:58:13 2021 -0700
PHOENIX-6476 Index tool when verifying from index to data doesn't correctly split page into tasks (#1240) (#1248)
* PHOENIX-6476 Index tool when verifying from index to data doesn't correctly split page into tasks
* Fix failing tests, add comments
* Break early when the matching set is found
---
.../end2end/IndexRepairRegionScannerIT.java | 43 ++++++++++++++++++----
.../coprocessor/IndexRepairRegionScanner.java | 33 ++++++++++++++++-
2 files changed, 67 insertions(+), 9 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRepairRegionScannerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRepairRegionScannerIT.java
index 7ca92b0..4440ada 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRepairRegionScannerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRepairRegionScannerIT.java
@@ -29,9 +29,11 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.phoenix.compat.hbase.HbaseCompatCapabilities;
import org.apache.phoenix.compat.hbase.coprocessor.CompatBaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.GlobalIndexRegionScanner;
import org.apache.phoenix.coprocessor.IndexRepairRegionScanner;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.index.IndexMaintainer;
@@ -61,6 +63,8 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
@@ -96,6 +100,7 @@ import static org.junit.Assume.assumeTrue;
@RunWith(Parameterized.class)
public class IndexRepairRegionScannerIT extends ParallelStatsDisabledIT {
+ private static final Logger LOGGER = LoggerFactory.getLogger(IndexRepairRegionScannerIT.class);
private final String tableDDLOptions;
private final String indexDDLOptions;
private boolean mutable;
@@ -133,9 +138,11 @@ public class IndexRepairRegionScannerIT extends ParallelStatsDisabledIT {
@BeforeClass
public static synchronized void doSetup() throws Exception {
// below settings are needed to enforce major compaction
- Map<String, String> props = Maps.newHashMapWithExpectedSize(2);
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(3);
props.put(CompatBaseScannerRegionObserver.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(0));
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0));
+ // to force multiple verification tasks to be spawned so that we can exercise the page splitting logic
+ props.put(GlobalIndexRegionScanner.INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY, Long.toString(2));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
@@ -224,6 +231,13 @@ public class IndexRepairRegionScannerIT extends ParallelStatsDisabledIT {
getUtility().getHBaseAdmin().truncateTable(TableName.valueOf(RESULT_TABLE_NAME), true);
}
+ private void dumpIndexToolMRJobCounters(IndexTool indexTool) throws IOException {
+ CounterGroup mrJobCounters = IndexToolIT.getMRJobCounters(indexTool);
+ for (Counter counter : mrJobCounters) {
+ LOGGER.info(String.format("%s=%d", counter.getName(), counter.getValue()));
+ }
+ }
+
private void assertExtraCounters(IndexTool indexTool, long extraVerified, long extraUnverified,
boolean isBefore) throws IOException {
CounterGroup mrJobCounters = IndexToolIT.getMRJobCounters(indexTool);
@@ -241,7 +255,7 @@ public class IndexRepairRegionScannerIT extends ParallelStatsDisabledIT {
}
}
- private void assertDisableLogging(Connection conn, int expectedRows,
+ private void assertDisableLogging(Connection conn, int expectedExtraRows, int expectedPITRows,
IndexTool.IndexVerifyType verifyType,
IndexTool.IndexDisableLoggingType disableLoggingType,
byte[] expectedPhase,
@@ -254,19 +268,32 @@ public class IndexRepairRegionScannerIT extends ParallelStatsDisabledIT {
null,
expectedStatus, verifyType, disableLoggingType, "-fi");
assertNotNull(tool);
- byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName);
+ try {
+ assertExtraCounters(tool, expectedExtraRows, 0, true);
+ } catch (AssertionError e) {
+ dumpIndexToolMRJobCounters(tool);
+ throw e;
+ }
+
+ byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName);
IndexVerificationOutputRepository outputRepository =
new IndexVerificationOutputRepository(indexTableFullNameBytes, conn);
List<IndexVerificationOutputRow> rows =
outputRepository.getAllOutputRows();
try {
- assertEquals(expectedRows, rows.size());
+ if (expectedPITRows == 0) {
+ assertTrue(rows.isEmpty());
+ } else {
+ // https://issues.apache.org/jira/browse/HBASE-17361 HTable#Put() is not threadsafe
+ // in releases < HBase 2.0 so occasionally we may fail to add some rows to PIT table
+ assertTrue(expectedPITRows >= rows.size());
+ }
} catch (AssertionError e) {
TestUtil.dumpTable(conn, TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
throw e;
}
- if (expectedRows > 0) {
+ if (expectedPITRows > 0) {
assertArrayEquals(expectedPhase, rows.get(0).getPhaseValue());
}
}
@@ -798,20 +825,20 @@ public class IndexRepairRegionScannerIT extends ParallelStatsDisabledIT {
// run the index MR job as ONLY so the index doesn't get rebuilt. Should be NROWS number
// of extra rows. We pass in --disable-logging BEFORE to silence the output logging to
// PHOENIX_INDEX_TOOL
- assertDisableLogging(conn, 0, IndexTool.IndexVerifyType.ONLY,
+ assertDisableLogging(conn, NROWS, 0, IndexTool.IndexVerifyType.ONLY,
IndexTool.IndexDisableLoggingType.BEFORE, null, schemaName, dataTableName, indexTableName,
indexTableFullName, 0);
truncateIndexToolTables();
// logging to PHOENIX_INDEX_TOOL enabled
- assertDisableLogging(conn, NROWS, IndexTool.IndexVerifyType.ONLY,
+ assertDisableLogging(conn, NROWS, NROWS, IndexTool.IndexVerifyType.ONLY,
IndexTool.IndexDisableLoggingType.NONE,
IndexVerificationOutputRepository.PHASE_BEFORE_VALUE,schemaName,
dataTableName, indexTableName,
indexTableFullName, 0);
truncateIndexToolTables();
- assertDisableLogging(conn, 0, IndexTool.IndexVerifyType.BEFORE,
+ assertDisableLogging(conn, NROWS, 0, IndexTool.IndexVerifyType.BEFORE,
IndexTool.IndexDisableLoggingType.BEFORE,
null, schemaName,
dataTableName, indexTableName,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
index a3bba6c..ee41dc4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
@@ -30,8 +30,10 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
import java.util.TreeSet;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
@@ -279,6 +281,7 @@ public class IndexRepairRegionScanner extends GlobalIndexRegionScanner {
regionIndex++;
}
for (byte[] dataRowKey: dataRowKeys) {
+ indexKey = dataRowKey;
if (perTaskDataRowKeys.size() == maxSetSize ||
(regionIndex < regionCount - 1 && Bytes.BYTES_COMPARATOR.compare(indexKey, endKeys[regionIndex]) > 0)) {
perTaskDataRowKeys = new TreeSet<>(Bytes.BYTES_COMPARATOR);
@@ -302,6 +305,33 @@ public class IndexRepairRegionScanner extends GlobalIndexRegionScanner {
return dataRowKeys;
}
+ /**
+ * @param indexMutationMap actual index mutations for a page
+ * @param dataRowKeysSetList List of per-task data row keys
+ * @return For each set of data row keys, split the acutal index mutation map into
+ * a per-task index mutation map and return the list of all index mutation maps.
+ */
+ private List<Map<byte[], List<Mutation>>> getPerTaskIndexMutationMap(
+ Map<byte[], List<Mutation>> indexMutationMap, List<Set<byte[]>> dataRowKeysSetList) {
+ List<Map<byte[], List<Mutation>>> mapList = Lists.newArrayListWithExpectedSize(dataRowKeysSetList.size());
+ for (int i = 0; i < dataRowKeysSetList.size(); ++i) {
+ Map<byte[], List<Mutation>> perTaskIndexMutationMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ mapList.add(perTaskIndexMutationMap);
+ }
+ for (Map.Entry<byte[], List<Mutation>> entry : indexMutationMap.entrySet()) {
+ byte[] indexRowKey = entry.getKey();
+ List<Mutation> actualMutationList = entry.getValue();
+ byte[] dataRowKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRowKey), viewConstants);
+ for (int i = 0; i < dataRowKeysSetList.size(); ++i) {
+ if (dataRowKeysSetList.get(i).contains(dataRowKey)) {
+ mapList.get(i).put(indexRowKey, actualMutationList);
+ break;
+ }
+ }
+ }
+ return mapList;
+ }
+
private void verifyAndOrRepairIndex(Map<byte[], List<Mutation>> actualIndexMutationMap) throws IOException {
if (actualIndexMutationMap.size() == 0) {
return;
@@ -309,13 +339,14 @@ public class IndexRepairRegionScanner extends GlobalIndexRegionScanner {
Set<byte[]> dataRowKeys = getDataRowKeys(actualIndexMutationMap);
List<Set<byte[]>> setList = getPerTaskDataRowKeys((TreeSet<byte[]>) dataRowKeys,
regionEndKeys, rowCountPerTask);
+ List<Map<byte[], List<Mutation>>> indexMutationMapList = getPerTaskIndexMutationMap(actualIndexMutationMap, setList);
int taskCount = setList.size();
TaskBatch<Boolean> tasks = new TaskBatch<>(taskCount);
List<IndexToolVerificationResult> verificationResultList = new ArrayList<>(taskCount);
for (int i = 0; i < taskCount; i++) {
IndexToolVerificationResult perTaskVerificationResult = new IndexToolVerificationResult(scan);
verificationResultList.add(perTaskVerificationResult);
- addRepairAndOrVerifyTask(tasks, setList.get(i), actualIndexMutationMap, perTaskVerificationResult);
+ addRepairAndOrVerifyTask(tasks, setList.get(i), indexMutationMapList.get(i), perTaskVerificationResult);
}
submitTasks(tasks);
if (verify) {