You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by vj...@apache.org on 2021/07/23 15:16:01 UTC

[phoenix] branch 5.1 updated: PHOENIX-6476 Index tool when verifying from index to data doesn't correctly split page into tasks (#1240) (#1248)

This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.1 by this push:
     new dc79b58  PHOENIX-6476 Index tool when verifying from index to data doesn't correctly split page into tasks (#1240) (#1248)
dc79b58 is described below

commit dc79b58ac6c1a321b6eeecc9f3946df66f75226d
Author: tkhurana <kh...@gmail.com>
AuthorDate: Wed Jun 16 15:58:13 2021 -0700

    PHOENIX-6476 Index tool when verifying from index to data doesn't correctly split page into tasks (#1240) (#1248)
    
    * PHOENIX-6476 Index tool when verifying from index to data doesn't correctly split page into tasks
    
    * Fix failing tests, add comments
    
    * Break early when the matching set is found
---
 .../end2end/IndexRepairRegionScannerIT.java        | 43 ++++++++++++++++++----
 .../coprocessor/IndexRepairRegionScanner.java      | 33 ++++++++++++++++-
 2 files changed, 67 insertions(+), 9 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRepairRegionScannerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRepairRegionScannerIT.java
index 7ca92b0..4440ada 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRepairRegionScannerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRepairRegionScannerIT.java
@@ -29,9 +29,11 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.CounterGroup;
 import org.apache.phoenix.compat.hbase.HbaseCompatCapabilities;
 import org.apache.phoenix.compat.hbase.coprocessor.CompatBaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.GlobalIndexRegionScanner;
 import org.apache.phoenix.coprocessor.IndexRepairRegionScanner;
 import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.index.IndexMaintainer;
@@ -61,6 +63,8 @@ import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.sql.Connection;
@@ -96,6 +100,7 @@ import static org.junit.Assume.assumeTrue;
 @RunWith(Parameterized.class)
 public class IndexRepairRegionScannerIT extends ParallelStatsDisabledIT {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(IndexRepairRegionScannerIT.class);
     private final String tableDDLOptions;
     private final String indexDDLOptions;
     private boolean mutable;
@@ -133,9 +138,11 @@ public class IndexRepairRegionScannerIT extends ParallelStatsDisabledIT {
     @BeforeClass
     public static synchronized void doSetup() throws Exception {
         // below settings are needed to enforce major compaction
-        Map<String, String> props = Maps.newHashMapWithExpectedSize(2);
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(3);
         props.put(CompatBaseScannerRegionObserver.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(0));
         props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0));
+        // to force multiple verification tasks to be spawned so that we can exercise the page splitting logic
+        props.put(GlobalIndexRegionScanner.INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY, Long.toString(2));
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
 
@@ -224,6 +231,13 @@ public class IndexRepairRegionScannerIT extends ParallelStatsDisabledIT {
         getUtility().getHBaseAdmin().truncateTable(TableName.valueOf(RESULT_TABLE_NAME), true);
     }
 
+    private void dumpIndexToolMRJobCounters(IndexTool indexTool) throws IOException {
+        CounterGroup mrJobCounters = IndexToolIT.getMRJobCounters(indexTool);
+        for (Counter counter : mrJobCounters) {
+            LOGGER.info(String.format("%s=%d", counter.getName(), counter.getValue()));
+        }
+    }
+
     private void assertExtraCounters(IndexTool indexTool, long extraVerified, long extraUnverified,
             boolean isBefore) throws IOException {
         CounterGroup mrJobCounters = IndexToolIT.getMRJobCounters(indexTool);
@@ -241,7 +255,7 @@ public class IndexRepairRegionScannerIT extends ParallelStatsDisabledIT {
         }
     }
 
-    private void assertDisableLogging(Connection conn, int expectedRows,
+    private void assertDisableLogging(Connection conn, int expectedExtraRows, int expectedPITRows,
         IndexTool.IndexVerifyType verifyType,
         IndexTool.IndexDisableLoggingType disableLoggingType,
         byte[] expectedPhase,
@@ -254,19 +268,32 @@ public class IndexRepairRegionScannerIT extends ParallelStatsDisabledIT {
             null,
             expectedStatus, verifyType, disableLoggingType, "-fi");
         assertNotNull(tool);
-        byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName);
 
+        try {
+            assertExtraCounters(tool, expectedExtraRows, 0, true);
+        } catch (AssertionError e) {
+            dumpIndexToolMRJobCounters(tool);
+            throw e;
+        }
+
+        byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName);
         IndexVerificationOutputRepository outputRepository =
             new IndexVerificationOutputRepository(indexTableFullNameBytes, conn);
         List<IndexVerificationOutputRow> rows =
             outputRepository.getAllOutputRows();
         try {
-            assertEquals(expectedRows, rows.size());
+            if (expectedPITRows == 0) {
+                assertTrue(rows.isEmpty());
+            } else {
+                // https://issues.apache.org/jira/browse/HBASE-17361 HTable#Put() is not threadsafe
+                // in releases < HBase 2.0 so occasionally we may fail to add some rows to PIT table
+                assertTrue(expectedPITRows >= rows.size());
+            }
         } catch (AssertionError e) {
             TestUtil.dumpTable(conn, TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
             throw e;
         }
-        if (expectedRows > 0) {
+        if (expectedPITRows > 0) {
             assertArrayEquals(expectedPhase, rows.get(0).getPhaseValue());
         }
     }
@@ -798,20 +825,20 @@ public class IndexRepairRegionScannerIT extends ParallelStatsDisabledIT {
             // run the index MR job as ONLY so the index doesn't get rebuilt. Should be NROWS number
             // of extra rows. We pass in --disable-logging BEFORE to silence the output logging to
             // PHOENIX_INDEX_TOOL
-            assertDisableLogging(conn, 0, IndexTool.IndexVerifyType.ONLY,
+            assertDisableLogging(conn, NROWS, 0, IndexTool.IndexVerifyType.ONLY,
                 IndexTool.IndexDisableLoggingType.BEFORE, null, schemaName, dataTableName, indexTableName,
                 indexTableFullName, 0);
             truncateIndexToolTables();
 
             // logging to PHOENIX_INDEX_TOOL enabled
-            assertDisableLogging(conn, NROWS, IndexTool.IndexVerifyType.ONLY,
+            assertDisableLogging(conn, NROWS, NROWS, IndexTool.IndexVerifyType.ONLY,
                 IndexTool.IndexDisableLoggingType.NONE,
                 IndexVerificationOutputRepository.PHASE_BEFORE_VALUE,schemaName,
                 dataTableName, indexTableName,
                 indexTableFullName, 0);
             truncateIndexToolTables();
 
-            assertDisableLogging(conn, 0, IndexTool.IndexVerifyType.BEFORE,
+            assertDisableLogging(conn, NROWS, 0, IndexTool.IndexVerifyType.BEFORE,
                 IndexTool.IndexDisableLoggingType.BEFORE,
                 null, schemaName,
                 dataTableName, indexTableName,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
index a3bba6c..ee41dc4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
@@ -30,8 +30,10 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.TreeSet;
 
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.KeyValue;
@@ -279,6 +281,7 @@ public class IndexRepairRegionScanner extends GlobalIndexRegionScanner {
             regionIndex++;
         }
         for (byte[] dataRowKey: dataRowKeys) {
+            indexKey = dataRowKey;
             if (perTaskDataRowKeys.size() == maxSetSize ||
                     (regionIndex < regionCount - 1 && Bytes.BYTES_COMPARATOR.compare(indexKey, endKeys[regionIndex]) > 0)) {
                 perTaskDataRowKeys = new TreeSet<>(Bytes.BYTES_COMPARATOR);
@@ -302,6 +305,33 @@ public class IndexRepairRegionScanner extends GlobalIndexRegionScanner {
         return dataRowKeys;
     }
 
+    /**
+     * @param indexMutationMap actual index mutations for a page
+     * @param dataRowKeysSetList List of per-task data row keys
+     * @return For each set of data row keys, split the acutal index mutation map into
+     * a per-task index mutation map and return the list of all index mutation maps.
+     */
+    private List<Map<byte[], List<Mutation>>> getPerTaskIndexMutationMap(
+            Map<byte[], List<Mutation>> indexMutationMap, List<Set<byte[]>> dataRowKeysSetList) {
+        List<Map<byte[], List<Mutation>>> mapList = Lists.newArrayListWithExpectedSize(dataRowKeysSetList.size());
+        for (int i = 0; i < dataRowKeysSetList.size(); ++i) {
+            Map<byte[], List<Mutation>> perTaskIndexMutationMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+            mapList.add(perTaskIndexMutationMap);
+        }
+        for (Map.Entry<byte[], List<Mutation>> entry : indexMutationMap.entrySet()) {
+            byte[] indexRowKey = entry.getKey();
+            List<Mutation> actualMutationList = entry.getValue();
+            byte[] dataRowKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRowKey), viewConstants);
+            for (int i = 0; i < dataRowKeysSetList.size(); ++i) {
+                if (dataRowKeysSetList.get(i).contains(dataRowKey)) {
+                    mapList.get(i).put(indexRowKey, actualMutationList);
+                    break;
+                }
+            }
+        }
+        return mapList;
+    }
+
     private void verifyAndOrRepairIndex(Map<byte[], List<Mutation>> actualIndexMutationMap) throws IOException {
         if (actualIndexMutationMap.size() == 0) {
             return;
@@ -309,13 +339,14 @@ public class IndexRepairRegionScanner extends GlobalIndexRegionScanner {
         Set<byte[]> dataRowKeys = getDataRowKeys(actualIndexMutationMap);
         List<Set<byte[]>> setList = getPerTaskDataRowKeys((TreeSet<byte[]>) dataRowKeys,
                 regionEndKeys, rowCountPerTask);
+        List<Map<byte[], List<Mutation>>> indexMutationMapList = getPerTaskIndexMutationMap(actualIndexMutationMap, setList);
         int taskCount = setList.size();
         TaskBatch<Boolean> tasks = new TaskBatch<>(taskCount);
         List<IndexToolVerificationResult> verificationResultList = new ArrayList<>(taskCount);
         for (int i = 0; i < taskCount; i++) {
             IndexToolVerificationResult perTaskVerificationResult = new IndexToolVerificationResult(scan);
             verificationResultList.add(perTaskVerificationResult);
-            addRepairAndOrVerifyTask(tasks, setList.get(i), actualIndexMutationMap, perTaskVerificationResult);
+            addRepairAndOrVerifyTask(tasks, setList.get(i), indexMutationMapList.get(i), perTaskVerificationResult);
         }
         submitTasks(tasks);
         if (verify) {