You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sk...@apache.org on 2020/12/17 22:14:22 UTC
[phoenix] branch 4.x updated: Merge PHOENIX-6182: IndexTool to
verify and repair every index row (#1022)
This is an automated email from the ASF dual-hosted git repository.
skadam pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new 3b6ea02 Merge PHOENIX-6182: IndexTool to verify and repair every index row (#1022)
3b6ea02 is described below
commit 3b6ea0240de9d4a0ad4747c3cd43cac5c949cc23
Author: tkhurana <kh...@gmail.com>
AuthorDate: Thu Dec 17 14:12:42 2020 -0800
Merge PHOENIX-6182: IndexTool to verify and repair every index row (#1022)
* PHOENIX-6198 Add option to IndexTool to specify the source table for scan (#937)
* PHOENIX-6198 Add option to IndexTool to specify the source table for scan
* Addressed feedback for PHOENIX-6198
Extended the `-from-index` option to support -vBOTH, -vAFTER and -vNONE.
Added the disclaimer for -vAFTER. Also, using the source table enum from
IndexScrutinyTool.
* PHOENIX-6199 Generate different query plan depending upon if the source (#958)
is index table or data table
* PHOENIX-6200 Add counters for extra index rows, log results to PIT and PIT_RESULT table (#995)
* PHOENIX-6200 Add counters for extra index rows, log results to PIT and PIT_RESULT table
* Address feedback
* PHOENIX-6200 (addendum) Fix test case because invalid rows now are
reported as beyond max lookback when max lookback is set to 0
Also add ASF license to one file
---
.../end2end/IndexRepairRegionScannerIT.java | 691 +++++++++++++++++++--
.../org/apache/phoenix/end2end/IndexToolIT.java | 44 +-
.../PhoenixServerBuildIndexInputFormatIT.java | 105 ++++
.../coprocessor/GlobalIndexRegionScanner.java | 60 +-
.../coprocessor/IndexRepairRegionScanner.java | 35 +-
.../coprocessor/IndexToolVerificationResult.java | 89 ++-
.../PhoenixServerBuildIndexInputFormat.java | 73 ++-
.../apache/phoenix/mapreduce/index/IndexTool.java | 20 +-
.../index/IndexVerificationOutputRepository.java | 1 +
.../index/IndexVerificationResultRepository.java | 23 +-
.../index/PhoenixIndexImportDirectReducer.java | 8 +
.../index/PhoenixIndexToolJobCounters.java | 6 +-
.../mapreduce/util/PhoenixConfigurationUtil.java | 16 +
.../org/apache/phoenix/index/IndexToolTest.java | 36 +-
.../phoenix/index/VerifySingleIndexRowTest.java | 10 +-
.../util/PhoenixConfigurationUtilTest.java | 18 +
16 files changed, 1136 insertions(+), 99 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRepairRegionScannerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRepairRegionScannerIT.java
index f023a74..5018cdd 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRepairRegionScannerIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRepairRegionScannerIT.java
@@ -17,6 +17,9 @@
*/
package org.apache.phoenix.end2end;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -27,27 +30,35 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.phoenix.coprocessor.IndexRepairRegionScanner;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.index.IndexMaintainer;
-import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.mapreduce.index.IndexTool.IndexDisableLoggingType;
+import org.apache.phoenix.mapreduce.index.IndexTool.IndexVerifyType;
import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
+import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRow;
import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexScrutiny;
+import org.apache.phoenix.util.ManualEnvironmentEdge;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.After;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -59,15 +70,28 @@ import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import java.util.Properties;
import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
-import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.RESULT_TABLE_NAME;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+@Category(NeedsOwnMiniClusterTest.class)
@RunWith(Parameterized.class)
public class IndexRepairRegionScannerIT extends ParallelStatsDisabledIT {
@@ -93,12 +117,22 @@ public class IndexRepairRegionScannerIT extends ParallelStatsDisabledIT {
{false} });
}
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ // below settings are needed to enforce major compaction
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(2);
+ props.put(ScanInfoUtil.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(0));
+ props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0));
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
@Before
public void createIndexToolTables() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
IndexTool.createIndexToolTables(conn);
}
+ resetIndexRegionObserverFailPoints();
}
@After
@@ -111,29 +145,7 @@ public class IndexRepairRegionScannerIT extends ParallelStatsDisabledIT {
TableName.valueOf(IndexVerificationResultRepository.RESULT_TABLE_NAME));
}
EnvironmentEdgeManager.reset();
- }
-
- private void repairIndex(Connection conn, String schemaName, String dataTableFullName, String indexTableName, IndexTool.IndexVerifyType verifyType) throws Exception {
- PTable pDataTable = PhoenixRuntime.getTable(conn, dataTableFullName);
- PTable pIndexTable = PhoenixRuntime.getTable(conn, SchemaUtil.getTableName(schemaName, indexTableName));
- Table hTable = conn.unwrap(PhoenixConnection.class).getQueryServices()
- .getTable(pIndexTable.getPhysicalName().getBytes());
- Scan scan = new Scan();
- scan.setRaw(true);
- scan.setMaxVersions();
- ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- PhoenixConnection phoenixConnection = conn.unwrap(PhoenixConnection.class);
- IndexMaintainer.serialize(pDataTable, ptr, Collections.singletonList(pIndexTable), phoenixConnection);
- scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE, verifyType.toBytes());
- scan.setAttribute(BaseScannerRegionObserver.PHYSICAL_DATA_TABLE_NAME, pDataTable.getPhysicalName().getBytes());
- scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr));
- scan.setAttribute(BaseScannerRegionObserver.UNGROUPED_AGG, TRUE_BYTES);
- scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD));
- scan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
- scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true));
- ResultScanner scanner = hTable.getScanner(scan);
- for (Result result = scanner.next(); result != null; result = scanner.next()) {
- }
+ resetIndexRegionObserverFailPoints();
}
private void setIndexRowStatusesToVerified(Connection conn, String dataTableFullName, String indexTableFullName) throws Exception {
@@ -154,6 +166,112 @@ public class IndexRepairRegionScannerIT extends ParallelStatsDisabledIT {
}
}
+ private void initTablesAndAddExtraRowsToIndex(Connection conn, String schemaName, String dataTableName,
+ String indexTableName, int NROWS) throws Exception {
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+
+ conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) " + tableDDLOptions);
+ PreparedStatement dataPreparedStatement =
+ conn.prepareStatement("UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)");
+ for (int i = 1; i <= NROWS; i++) {
+ dataPreparedStatement.setInt(1, i);
+ dataPreparedStatement.setInt(2, i + 1);
+ dataPreparedStatement.setInt(3, i * 2);
+ dataPreparedStatement.execute();
+ }
+ conn.commit();
+ conn.createStatement().execute(String.format(
+ "CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2)", indexTableName, dataTableFullName));
+
+ // Add extra index rows
+ PreparedStatement indexPreparedStatement =
+ conn.prepareStatement("UPSERT INTO " + indexTableFullName + " VALUES(?,?,?)");
+
+ for (int i = NROWS + 1; i <= 2 * NROWS; i++) {
+ indexPreparedStatement.setInt(1, i + 1); // the indexed column
+ indexPreparedStatement.setInt(2, i); // the data pk column
+ indexPreparedStatement.setInt(3, i * 2); // the included column
+ indexPreparedStatement.execute();
+ }
+ conn.commit();
+
+ // Set all index row statuses to verified so that read verify will not fix them. We want them to be fixed
+ // by IndexRepairRegionScanner
+ setIndexRowStatusesToVerified(conn, dataTableFullName, indexTableFullName);
+ }
+
+ private void truncateIndexToolTables() throws IOException {
+ getUtility().getHBaseAdmin().disableTable(TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
+ getUtility().getHBaseAdmin().truncateTable(TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME), true);
+ getUtility().getHBaseAdmin().disableTable(TableName.valueOf(RESULT_TABLE_NAME));
+ getUtility().getHBaseAdmin().truncateTable(TableName.valueOf(RESULT_TABLE_NAME), true);
+ }
+
+ private void assertExtraCounters(IndexTool indexTool, long extraVerified, long extraUnverified,
+ boolean isBefore) throws IOException {
+ CounterGroup mrJobCounters = IndexToolIT.getMRJobCounters(indexTool);
+
+ if (isBefore) {
+ assertEquals(extraVerified,
+ mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(extraUnverified,
+ mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT.name()).getValue());
+ } else {
+ assertEquals(extraVerified,
+ mrJobCounters.findCounter(AFTER_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(extraUnverified,
+ mrJobCounters.findCounter(AFTER_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT.name()).getValue());
+ }
+ }
+
+ private void assertDisableLogging(Connection conn, int expectedRows,
+ IndexTool.IndexVerifyType verifyType,
+ IndexTool.IndexDisableLoggingType disableLoggingType,
+ byte[] expectedPhase,
+ String schemaName, String dataTableName,
+ String indexTableName, String indexTableFullName,
+ int expectedStatus) throws Exception {
+
+ IndexTool tool = IndexToolIT.runIndexTool(getUtility().getConfiguration(), true, false, schemaName, dataTableName,
+ indexTableName,
+ null,
+ expectedStatus, verifyType, disableLoggingType, "-fi");
+ assertNotNull(tool);
+ byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName);
+
+ IndexVerificationOutputRepository outputRepository =
+ new IndexVerificationOutputRepository(indexTableFullNameBytes, conn);
+ List<IndexVerificationOutputRow> rows =
+ outputRepository.getAllOutputRows();
+ try {
+ assertEquals(expectedRows, rows.size());
+ } catch (AssertionError e) {
+ TestUtil.dumpTable(conn, TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
+ throw e;
+ }
+ if (expectedRows > 0) {
+ assertArrayEquals(expectedPhase, rows.get(0).getPhaseValue());
+ }
+ }
+
+ static private void resetIndexRegionObserverFailPoints() {
+ IndexRegionObserver.setFailPreIndexUpdatesForTesting(false);
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+ }
+
+ static private void commitWithException(Connection conn) {
+ try {
+ conn.commit();
+ resetIndexRegionObserverFailPoints();
+ fail();
+ } catch (Exception e) {
+ // this is expected
+ }
+ }
+
@Test
public void testRepairExtraIndexRows() throws Exception {
final int NROWS = 20;
@@ -163,12 +281,55 @@ public class IndexRepairRegionScannerIT extends ParallelStatsDisabledIT {
String indexTableName = generateUniqueName();
String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ initTablesAndAddExtraRowsToIndex(conn, schemaName, dataTableName, indexTableName, NROWS);
+
+ // do index rebuild without -fi and check with scrutiny that index tool failed to fix the extra rows
+ IndexToolIT.runIndexTool(false, false, schemaName, dataTableName,
+ indexTableName, null, 0, IndexVerifyType.BEFORE);
+
+ boolean failed;
+ try {
+ IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+ failed = false;
+ } catch (AssertionError e) {
+ failed = true;
+ }
+ assertTrue(failed);
+
+ // now repair the index with -fi
+ IndexTool indexTool = IndexToolIT.runIndexTool(false, false, schemaName, dataTableName,
+ indexTableName, null, 0, IndexVerifyType.BEFORE, "-fi");
+
+ long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+ assertEquals(NROWS, actualRowCount);
+
+ assertExtraCounters(indexTool, NROWS, 0, true);
+ }
+ }
+
+ @Test
+ public void testRepairExtraIndexRows_PostIndexUpdateFailure_overwrite() throws Exception {
+ if (!mutable) {
+ return;
+ }
+ final int NROWS = 4;
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.createStatement().execute("CREATE TABLE " + dataTableFullName
- + " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) "
- + tableDDLOptions);
+ + " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) " + tableDDLOptions);
+ conn.createStatement().execute(String.format(
+ "CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2)", indexTableName, dataTableFullName));
+
PreparedStatement dataPreparedStatement =
- conn.prepareStatement("UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)");
+ conn.prepareStatement("UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)");
for (int i = 1; i <= NROWS; i++) {
dataPreparedStatement.setInt(1, i);
dataPreparedStatement.setInt(2, i + 1);
@@ -176,22 +337,192 @@ public class IndexRepairRegionScannerIT extends ParallelStatsDisabledIT {
dataPreparedStatement.execute();
}
conn.commit();
+
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+ conn.createStatement().execute("UPSERT INTO " + dataTableFullName + " VALUES(3, 100, 200)");
+ conn.commit();
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+
+ IndexTool indexTool = IndexToolIT.runIndexTool(false, false, schemaName, dataTableName,
+ indexTableName, null, 0, IndexVerifyType.BEFORE, "-fi");
+
+ CounterGroup mrJobCounters = IndexToolIT.getMRJobCounters(indexTool);
+
+ assertEquals(2,
+ mrJobCounters.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(2,
+ mrJobCounters.findCounter(BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(0,
+ mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(0,
+ mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT.name()).getValue());
+
+ indexTool = IndexToolIT.runIndexTool(false, false, schemaName, dataTableName,
+ indexTableName, null, 0, IndexVerifyType.ONLY, "-fi");
+ mrJobCounters = IndexToolIT.getMRJobCounters(indexTool);
+ assertEquals(0,
+ mrJobCounters.findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(0,
+ mrJobCounters.findCounter(BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(0,
+ mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(0,
+ mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT.name()).getValue());
+
+ long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+ assertEquals(NROWS, actualRowCount);
+ }
+ }
+
+ @Test
+ public void testRepairExtraIndexRows_PostIndexUpdateFailure_delete() throws Exception {
+ if (!mutable) {
+ return;
+ }
+ final int NROWS = 4;
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) " + tableDDLOptions);
conn.createStatement().execute(String.format(
- "CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2)", indexTableName, dataTableFullName));
- // Add extra index rows
- PreparedStatement indexPreparedStatement =
- conn.prepareStatement("UPSERT INTO " + indexTableFullName + " VALUES(?,?,?)");
+ "CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2)", indexTableName, dataTableFullName));
- for (int i = NROWS + 1; i <= 2 * NROWS; i++) {
- indexPreparedStatement.setInt(1, i + 1); // the indexed column
- indexPreparedStatement.setInt(2, i); // the data pk column
- indexPreparedStatement.setInt(3, i * 2); // the included column
- indexPreparedStatement.execute();
+ PreparedStatement dataPreparedStatement =
+ conn.prepareStatement("UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)");
+ for (int i = 1; i <= NROWS; i++) {
+ dataPreparedStatement.setInt(1, i);
+ dataPreparedStatement.setInt(2, i + 1);
+ dataPreparedStatement.setInt(3, i * 2);
+ dataPreparedStatement.execute();
}
conn.commit();
- // Set all index row statuses to verified so that read verify will not fix them. We want them to be fixed
- // by IndexRepairRegionScanner
- setIndexRowStatusesToVerified(conn, dataTableFullName, indexTableFullName);
+
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+ conn.createStatement().execute("DELETE FROM " + dataTableFullName + " WHERE ID = 3");
+ conn.commit();
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+ TestUtil.doMajorCompaction(conn, dataTableFullName);
+
+ IndexTool indexTool = IndexToolIT.runIndexTool(false, false, schemaName, dataTableName,
+ indexTableName, null, 0, IndexVerifyType.BEFORE, "-fi");
+
+ CounterGroup mrJobCounters = IndexToolIT.getMRJobCounters(indexTool);
+
+ assertEquals(0,
+ mrJobCounters.findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(0,
+ mrJobCounters.findCounter(BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(0,
+ mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(1,
+ mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT.name()).getValue());
+
+ indexTool = IndexToolIT.runIndexTool(false, false, schemaName, dataTableName,
+ indexTableName, null, 0, IndexVerifyType.ONLY, "-fi");
+ mrJobCounters = IndexToolIT.getMRJobCounters(indexTool);
+
+ assertEquals(0,
+ mrJobCounters.findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(0,
+ mrJobCounters.findCounter(BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(0,
+ mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(0,
+ mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT.name()).getValue());
+
+ long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+ assertEquals(NROWS - 1, actualRowCount);
+ }
+ }
+
+ @Test
+ public void testRepairExtraIndexRows_DataTableUpdateFailure() throws Exception {
+ if (!mutable) {
+ return;
+ }
+ final int NROWS = 20;
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) " + tableDDLOptions);
+ conn.createStatement().execute(String.format(
+ "CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2)", indexTableName, dataTableFullName));
+
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+
+ PreparedStatement dataPreparedStatement =
+ conn.prepareStatement("UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)");
+ for (int i = 1; i <= NROWS; i++) {
+ dataPreparedStatement.setInt(1, i);
+ dataPreparedStatement.setInt(2, i + 1);
+ dataPreparedStatement.setInt(3, i * 2);
+ dataPreparedStatement.execute();
+ }
+ commitWithException(conn);
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+
+ IndexTool indexTool = IndexToolIT.runIndexTool(false, false, schemaName, dataTableName,
+ indexTableName, null, 0, IndexVerifyType.BEFORE, "-fi");
+
+ long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+ assertEquals(0, actualRowCount);
+
+ assertExtraCounters(indexTool, 0, NROWS, true);
+ }
+ }
+
+ @Test
+ public void testPITRow() throws Exception {
+ final int NROWS = 1;
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ initTablesAndAddExtraRowsToIndex(conn, schemaName, dataTableName, indexTableName, NROWS);
+
+ IndexToolIT.runIndexTool(false, false, schemaName, dataTableName,
+ indexTableName, null, 0, IndexVerifyType.ONLY, "-fi");
+
+ Cell cell = IndexToolIT.getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName);
+ String expectedErrorMsg = IndexRepairRegionScanner.ERROR_MESSAGE_EXTRA_INDEX_ROW;
+ String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+ assertTrue(actualErrorMsg.contains(expectedErrorMsg));
+ }
+ }
+
+ @Test
+ public void testVerifyAfterExtraIndexRows() throws Exception {
+ final int NROWS = 20;
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ initTablesAndAddExtraRowsToIndex(conn, schemaName, dataTableName, indexTableName, NROWS);
+
+ // Run -v AFTER and check it doesn't fix the extra rows and the job fails
+ IndexTool indexTool = IndexToolIT.runIndexTool(false, false, schemaName, dataTableName,
+ indexTableName, null, -1, IndexVerifyType.AFTER, "-fi");
+
boolean failed;
try {
IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
@@ -200,11 +531,275 @@ public class IndexRepairRegionScannerIT extends ParallelStatsDisabledIT {
failed = true;
}
assertTrue(failed);
- // Repair the index
- repairIndex(conn, schemaName, dataTableFullName, indexTableName, IndexTool.IndexVerifyType.BEFORE);
- long actualRowCount = IndexScrutiny
- .scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+
+ // job failed so no counters are output
+ }
+ }
+
+ @Test
+ public void testVerifyBothExtraIndexRows() throws Exception {
+ final int NROWS = 20;
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ initTablesAndAddExtraRowsToIndex(conn, schemaName, dataTableName, indexTableName, NROWS);
+
+ IndexTool indexTool = IndexToolIT.runIndexTool(false, false, schemaName, dataTableName,
+ indexTableName, null, 0, IndexVerifyType.BOTH, "-fi");
+
+ long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+ assertEquals(NROWS, actualRowCount);
+
+ assertExtraCounters(indexTool, 0, 0, false);
+ }
+ }
+
+ @Test
+ public void testOverrideIndexRebuildPageSizeFromIndexTool() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ final int NROWS = 20;
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ initTablesAndAddExtraRowsToIndex(conn, schemaName, dataTableName, indexTableName, NROWS);
+
+ Configuration conf = new Configuration(getUtility().getConfiguration());
+ conf.set(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(2));
+ IndexTool indexTool = IndexToolIT.runIndexTool(conf,false, false, schemaName, dataTableName,
+ indexTableName, null, 0, IndexVerifyType.BEFORE, IndexDisableLoggingType.NONE,"-fi");
+
+ long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
assertEquals(NROWS, actualRowCount);
+
+ assertExtraCounters(indexTool, NROWS, 0, true);
+ }
+ }
+
+ @Test
+ public void testViewIndexExtraRows() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String viewName = generateUniqueName();
+ String viewFullName = SchemaUtil.getTableName(schemaName, viewName);
+ String indexTableName1 = generateUniqueName();
+ String indexTableFullName1 = SchemaUtil.getTableName(schemaName, indexTableName1);
+ String indexTableName2 = generateUniqueName();
+ String indexTableFullName2 = SchemaUtil.getTableName(schemaName, indexTableName2);
+
+ conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) "
+ + tableDDLOptions);
+ conn.commit();
+ conn.createStatement().execute("CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName);
+ conn.commit();
+ // Insert a row
+ conn.createStatement().execute("UPSERT INTO " + viewFullName + " values (1, 2, 4)");
+ conn.commit();
+
+ conn.createStatement().execute(String.format(
+ "CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2)", indexTableName1, viewFullName));
+ conn.createStatement().execute(String.format(
+ "CREATE INDEX %s ON %s (VAL2) INCLUDE (VAL1)", indexTableName2, viewFullName));
+
+ // directly insert a row into index
+ conn.createStatement().execute("UPSERT INTO " + indexTableFullName1 + " VALUES (4, 2, 8)");
+ conn.createStatement().execute("UPSERT INTO " + indexTableFullName2 + " VALUES (8, 2, 4)");
+ conn.commit();
+ setIndexRowStatusesToVerified(conn, viewFullName, indexTableFullName1);
+
+ IndexTool indexTool = IndexToolIT.runIndexTool(false, false, schemaName, viewName,
+ indexTableName1, null, 0, IndexVerifyType.BEFORE, "-fi");
+ assertExtraCounters(indexTool, 1, 0, true);
+
+ indexTool = IndexToolIT.runIndexTool(false, false, schemaName, viewName,
+ indexTableName2, null, 0, IndexVerifyType.BEFORE, "-fi");
+ assertExtraCounters(indexTool, 1, 0, true);
+
+ String indexTablePhysicalName = "_IDX" + dataTableFullName;
+ byte[] indexTableFullNameBytes = Bytes.toBytes(indexTablePhysicalName);
+ IndexVerificationOutputRepository outputRepository =
+ new IndexVerificationOutputRepository(indexTableFullNameBytes, conn);
+ List<IndexVerificationOutputRow> rows =
+ outputRepository.getAllOutputRows();
+ try {
+ assertEquals(2, rows.size());
+ } catch (AssertionError e) {
+ TestUtil.dumpTable(conn, TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
+ throw e;
+ }
+ }
+ }
+
+ @Test
+ public void testFromIndexToolForIncrementalVerify() throws Exception {
+ final int NROWS = 4;
+ ManualEnvironmentEdge customEdge = new ManualEnvironmentEdge();
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+ long delta = 2;
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ long t0 = EnvironmentEdgeManager.currentTimeMillis();
+ customEdge.setValue(t0);
+ EnvironmentEdgeManager.injectEdge(customEdge);
+
+ conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) " + tableDDLOptions);
+ PreparedStatement dataPreparedStatement =
+ conn.prepareStatement("UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)");
+ for (int i = 1; i <= NROWS; i++) {
+ dataPreparedStatement.setInt(1, i);
+ dataPreparedStatement.setInt(2, i + 1);
+ dataPreparedStatement.setInt(3, i * 2);
+ dataPreparedStatement.execute();
+ }
+ conn.commit();
+
+ conn.createStatement().execute(String.format(
+ "CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2)", indexTableName, dataTableFullName));
+
+ customEdge.incrementValue(delta);
+ long t1 = customEdge.currentTime();
+
+ IndexTool it;
+ it = IndexToolIT.runIndexTool(false, false, schemaName, dataTableName,
+ indexTableName, null, 0, IndexVerifyType.ONLY,
+ "-fi", "-st", String.valueOf(t0), "-et", String.valueOf(t1));
+
+ CounterGroup mrJobCounters;
+ mrJobCounters = IndexToolIT.getMRJobCounters(it);
+ assertEquals(NROWS,
+ mrJobCounters.findCounter(SCANNED_DATA_ROW_COUNT.name()).getValue());
+ assertEquals(0,
+ mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(0,
+ mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT.name()).getValue());
+
+ // Add extra index rows
+ PreparedStatement indexPreparedStatement =
+ conn.prepareStatement("UPSERT INTO " + indexTableFullName + " VALUES(?,?,?)");
+ for (int i = NROWS + 1; i <= 2 * NROWS; i++) {
+ indexPreparedStatement.setInt(1, i + 1); // the indexed column
+ indexPreparedStatement.setInt(2, i); // the data pk column
+ indexPreparedStatement.setInt(3, i * 2); // the included column
+ indexPreparedStatement.execute();
+ }
+ conn.commit();
+
+ // Set all index row statuses to verified so that read verify will not fix them.
+ // We want them to be fixed by IndexRepairRegionScanner
+ setIndexRowStatusesToVerified(conn, dataTableFullName, indexTableFullName);
+ customEdge.incrementValue(delta);
+ long t2 = customEdge.currentTime();
+ it = IndexToolIT.runIndexTool(false, false, schemaName, dataTableName,
+ indexTableName, null, 0, IndexVerifyType.ONLY,
+ "-fi", "-st", String.valueOf(t1), "-et", String.valueOf(t2));
+
+ // incremental verification should only scan NROWS instead of total 2*NROWS
+ mrJobCounters = IndexToolIT.getMRJobCounters(it);
+ assertEquals(NROWS,
+ mrJobCounters.findCounter(SCANNED_DATA_ROW_COUNT.name()).getValue());
+ assertEquals(NROWS,
+ mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(0,
+ mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT.name()).getValue());
+
+ // now run another verification over the entire window [t0, t2]
+ it = IndexToolIT.runIndexTool(false, false, schemaName, dataTableName,
+ indexTableName, null, 0, IndexVerifyType.ONLY,
+ "-fi", "-st", String.valueOf(t0), "-et", String.valueOf(t2));
+
+ mrJobCounters = IndexToolIT.getMRJobCounters(it);
+ assertEquals(2*NROWS,
+ mrJobCounters.findCounter(SCANNED_DATA_ROW_COUNT.name()).getValue());
+ assertEquals(NROWS,
+ mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT.name()).getValue());
+ assertEquals(0,
+ mrJobCounters.findCounter(BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT.name()).getValue());
+ } finally {
+ EnvironmentEdgeManager.reset();
+ }
+ }
+
+ @Test
+ public void testDisableOutputLogging() throws Exception {
+ if (!mutable) {
+ return;
+ }
+ final int NROWS = 4;
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+ try(Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) " + tableDDLOptions);
+ PreparedStatement dataPreparedStatement =
+ conn.prepareStatement("UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)");
+ for (int i = 1; i <= NROWS; i++) {
+ dataPreparedStatement.setInt(1, i);
+ dataPreparedStatement.setInt(2, i + 1);
+ dataPreparedStatement.setInt(3, i * 2);
+ dataPreparedStatement.execute();
+ }
+ conn.commit();
+ conn.createStatement().execute(String.format(
+ "CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2)", indexTableName, dataTableFullName));
+
+ // Add extra index rows
+ PreparedStatement indexPreparedStatement =
+ conn.prepareStatement("UPSERT INTO " + indexTableFullName + " VALUES(?,?,?)");
+ for (int i = NROWS + 1; i <= 2 * NROWS; i++) {
+ indexPreparedStatement.setInt(1, i + 1); // the indexed column
+ indexPreparedStatement.setInt(2, i); // the data pk column
+ indexPreparedStatement.setInt(3, i * 2); // the included column
+ indexPreparedStatement.execute();
+ }
+ conn.commit();
+
+ // Set all index row statuses to verified so that read verify will not fix them.
+ // We want them to be fixed by IndexRepairRegionScanner
+ setIndexRowStatusesToVerified(conn, dataTableFullName, indexTableFullName);
+
+ // run the index MR job as ONLY so the index doesn't get rebuilt. Should be NROWS number
+ // of extra rows. We pass in --disable-logging BEFORE to silence the output logging to
+ // PHOENIX_INDEX_TOOL
+ assertDisableLogging(conn, 0, IndexTool.IndexVerifyType.ONLY,
+ IndexTool.IndexDisableLoggingType.BEFORE, null, schemaName, dataTableName, indexTableName,
+ indexTableFullName, 0);
+ truncateIndexToolTables();
+
+ // logging to PHOENIX_INDEX_TOOL enabled
+ assertDisableLogging(conn, NROWS, IndexTool.IndexVerifyType.ONLY,
+ IndexTool.IndexDisableLoggingType.NONE,
+ IndexVerificationOutputRepository.PHASE_BEFORE_VALUE,schemaName,
+ dataTableName, indexTableName,
+ indexTableFullName, 0);
+ truncateIndexToolTables();
+
+ assertDisableLogging(conn, 0, IndexTool.IndexVerifyType.BEFORE,
+ IndexTool.IndexDisableLoggingType.BEFORE,
+ null, schemaName,
+ dataTableName, indexTableName,
+ indexTableFullName, 0);
}
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index c56e01f..e6941b2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -26,10 +26,7 @@ import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -42,13 +39,14 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Job;
import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
-import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRow;
import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.mapreduce.index.PhoenixIndexImportDirectMapper;
+import org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters;
import org.apache.phoenix.mapreduce.index.PhoenixServerBuildIndexMapper;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
@@ -56,7 +54,6 @@ import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
import org.apache.phoenix.transaction.TransactionFactory;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
@@ -86,15 +83,7 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.phoenix.mapreduce.PhoenixJobCounters.INPUT_RECORDS;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_VALID_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS;
-import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS;
import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT;
@@ -636,19 +625,23 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
actualExplainPlan.contains(expectedExplainPlan));
}
+ public static CounterGroup getMRJobCounters(IndexTool indexTool) throws IOException {
+ return indexTool.getJob().getCounters().getGroup(PhoenixIndexToolJobCounters.class.getName());
+ }
+
private static List<String> getArgList (boolean directApi, boolean useSnapshot, String schemaName,
String dataTable, String indxTable, String tenantId,
IndexTool.IndexVerifyType verifyType, Long startTime,
Long endTime, Long incrementalVerify) {
return getArgList(directApi, useSnapshot, schemaName, dataTable, indxTable, tenantId,
- verifyType, startTime, endTime, IndexTool.IndexDisableLoggingType.NONE, incrementalVerify);
+ verifyType, startTime, endTime, IndexTool.IndexDisableLoggingType.NONE, incrementalVerify, false);
}
private static List<String> getArgList (boolean directApi, boolean useSnapshot, String schemaName,
String dataTable, String indxTable, String tenantId,
IndexTool.IndexVerifyType verifyType, Long startTime, Long endTime,
IndexTool.IndexDisableLoggingType disableLoggingType,
- Long incrementalVerify) {
+ Long incrementalVerify, boolean useIndexTableAsSource) {
List<String> args = Lists.newArrayList();
if (schemaName != null) {
args.add("-s");
@@ -692,6 +685,11 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
args.add("-rv");
args.add(String.valueOf(incrementalVerify));
}
+
+ if (useIndexTableAsSource) {
+ args.add("-fi");
+ }
+
args.add("-op");
args.add("/tmp/" + UUID.randomUUID().toString());
return args;
@@ -708,7 +706,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
String dataTable, String indexTable, String tenantId, IndexTool.IndexVerifyType verifyType,
IndexTool.IndexDisableLoggingType disableLoggingType) {
List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable,
- tenantId, verifyType, null, null, disableLoggingType, null);
+ tenantId, verifyType, null, null, disableLoggingType, null, false);
return args.toArray(new String[0]);
}
@@ -727,7 +725,19 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
IndexTool.IndexDisableLoggingType disableLoggingType,
Long incrementalVerify ) {
List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable,
- tenantId, verifyType, startTime, endTime, disableLoggingType, incrementalVerify);
+ tenantId, verifyType, startTime, endTime, disableLoggingType, incrementalVerify, false);
+ return args.toArray(new String[0]);
+ }
+
+ public static String [] getArgValues(boolean directApi, boolean useSnapshot, String schemaName,
+ String dataTable, String indexTable, String tenantId,
+ IndexTool.IndexVerifyType verifyType, Long startTime,
+ Long endTime,
+ IndexTool.IndexDisableLoggingType disableLoggingType,
+ Long incrementalVerify,
+ boolean useIndexTableAsSource) {
+ List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable,
+ tenantId, verifyType, startTime, endTime, disableLoggingType, incrementalVerify, useIndexTableAsSource);
return args.toArray(new String[0]);
}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormatIT.java b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormatIT.java
new file mode 100644
index 0000000..0110488
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormatIT.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.Properties;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+
+public class PhoenixServerBuildIndexInputFormatIT extends ParallelStatsDisabledIT {
+
+ @Test
+ public void testQueryPlanWithSource() throws Exception {
+ PhoenixServerBuildIndexInputFormat inputFormat;
+ Configuration conf = new Configuration(getUtility().getConfiguration());
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+ String viewName = generateUniqueName();
+ String viewFullName = SchemaUtil.getTableName(schemaName, viewName);
+ String viewIndexName = generateUniqueName();
+ String viewIndexFullName = SchemaUtil.getTableName(schemaName, viewIndexName);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) ");
+ conn.createStatement().execute(String.format(
+ "CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2)", indexTableName, dataTableFullName));
+ conn.createStatement().execute("CREATE VIEW " + viewFullName +
+ " AS SELECT * FROM " + dataTableFullName);
+ conn.commit();
+
+ conn.createStatement().execute(String.format(
+ "CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2)", viewIndexName, viewFullName));
+
+ PhoenixConfigurationUtil.setIndexToolDataTableName(conf, dataTableFullName);
+ PhoenixConfigurationUtil.setIndexToolIndexTableName(conf, indexTableFullName);
+ // use data table as source (default)
+ assertTableSource(conf, conn);
+
+ // use index table as source
+ PhoenixConfigurationUtil.setIndexToolSourceTable(conf, SourceTable.INDEX_TABLE_SOURCE);
+ assertTableSource(conf, conn);
+
+ PhoenixConfigurationUtil.setIndexToolDataTableName(conf, viewFullName);
+ PhoenixConfigurationUtil.setIndexToolIndexTableName(conf, viewIndexFullName);
+ PhoenixConfigurationUtil.setIndexToolSourceTable(conf, SourceTable.DATA_TABLE_SOURCE);
+
+ assertTableSource(conf, conn);
+
+ PhoenixConfigurationUtil.setIndexToolSourceTable(conf, SourceTable.INDEX_TABLE_SOURCE);
+ assertTableSource(conf, conn);
+ }
+ }
+
+ private void assertTableSource(Configuration conf, Connection conn) throws Exception {
+ String dataTableFullName = PhoenixConfigurationUtil.getIndexToolDataTableName(conf);
+ String indexTableFullName = PhoenixConfigurationUtil.getIndexToolIndexTableName(conf);
+ SourceTable sourceTable = PhoenixConfigurationUtil.getIndexToolSourceTable(conf);
+ boolean fromIndex = sourceTable.equals(SourceTable.INDEX_TABLE_SOURCE);
+ PTable pDataTable = PhoenixRuntime.getTable(conn, dataTableFullName);
+ PTable pIndexTable = PhoenixRuntime.getTable(conn, indexTableFullName);
+
+ PhoenixServerBuildIndexInputFormat inputFormat = new PhoenixServerBuildIndexInputFormat();
+ QueryPlan queryPlan = inputFormat.getQueryPlan(Job.getInstance(), conf);
+ PTable actual = queryPlan.getTableRef().getTable();
+
+ if (!fromIndex) {
+ assertEquals(pDataTable, actual);
+ } else {
+ assertEquals(pIndexTable, actual);
+ }
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
index 60ab3f2..cc2772d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -89,6 +89,7 @@ import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCo
import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_INVALID;
import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_MISSING;
import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.EXTRA_CELLS;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.EXTRA_ROW;
import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.INVALID_ROW;
import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.MISSING_ROW;
import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS;
@@ -111,6 +112,7 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
public static final String ACTUAL_MUTATION_IS_NULL_OR_EMPTY = "actualMutationList is null or empty";
public static final String ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK = "Missing index row beyond maxLookBack";
public static final String ERROR_MESSAGE_MISSING_INDEX_ROW = "Missing index row";
+ public static final String ERROR_MESSAGE_EXTRA_INDEX_ROW = "Extra index row";
public static final String PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS =
"phoenix.index.mr.log.beyond.max.lookback.errors";
public static final boolean DEFAULT_PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS = false;
@@ -327,6 +329,7 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
byte[] qualifier = CellUtil.cloneQualifier(cell);
return set.contains(qualifier);
}
+
@VisibleForTesting
public boolean shouldVerify(IndexTool.IndexVerifyType verifyType,
byte[] indexRowKey, Scan scan, Region region, IndexMaintainer indexMaintainer,
@@ -378,6 +381,7 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
dataHTable.close();
}
}
+
@Override
public void close() throws IOException {
innerScanner.close();
@@ -433,14 +437,19 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
null, isBeforeRebuild, errorType);
}
+ protected byte[] getDataTableName() {
+ return region.getRegionInfo().getTable().getName();
+ }
+
@VisibleForTesting
public void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
String errorMsg, byte[] expectedVaue, byte[] actualValue, boolean isBeforeRebuild,
IndexVerificationOutputRepository.IndexVerificationErrorType errorType) throws IOException {
ungroupedAggregateRegionObserver.checkForRegionClosing();
+ byte[] dataTableName = getDataTableName();
verificationOutputRepository.logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs,
errorMsg, expectedVaue, actualValue, scan.getTimeRange().getMax(),
- region.getRegionInfo().getTable().getName(), isBeforeRebuild, errorType);
+ dataTableName, isBeforeRebuild, errorType);
}
private static Cell getCell(Mutation m, byte[] family, byte[] qualifier) {
@@ -628,6 +637,37 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
}
/**
+ * actualIndexMutationList is the list of all the mutations of a single extra index row (i.e. not referenced by data row)
+ * ordered by decreasing order of timestamps with Deletes before Puts
+ */
+ private void logExtraIndexRowAndUpdateCounters(List<Mutation> actualIndexMutationList,
+ IndexToolVerificationResult.PhaseResult verificationPhaseResult, boolean isBeforeRebuild) throws IOException {
+ for (Mutation m : actualIndexMutationList) {
+ // this extra row in the index table has already been deleted
+ if ((m instanceof Delete)) {
+ return;
+ }
+
+ // check the empty column status of latest (most recent) put mutation
+ if (isVerified((Put) m)) {
+ verificationPhaseResult.setExtraVerifiedIndexRowCount(
+ verificationPhaseResult.getExtraVerifiedIndexRowCount() + 1);
+ } else {
+ verificationPhaseResult.setExtraUnverifiedIndexRowCount(
+ verificationPhaseResult.getExtraUnverifiedIndexRowCount() + 1);
+ }
+
+ byte[] indexKey = m.getRow();
+ byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexKey), viewConstants);
+ String errorMsg = ERROR_MESSAGE_EXTRA_INDEX_ROW;
+ IndexVerificationOutputRepository.IndexVerificationErrorType errorType = EXTRA_ROW;
+ logToIndexToolOutputTable(dataKey, indexKey, 0, getTimestamp(m), errorMsg,
+ isBeforeRebuild, errorType);
+ break;
+ }
+ }
+
+ /**
* In this method, the actual list is repaired in memory using the expected list which is actually the output of
* rebuilding the index table row. The result of this repair is used only for verification.
*/
@@ -771,6 +811,7 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
if (actualMutationList == null || actualMutationList.isEmpty()) {
throw new DoNotRetryIOException(ACTUAL_MUTATION_IS_NULL_OR_EMPTY);
}
+
if (isBeforeRebuild) {
Mutation m = actualMutationList.get(0);
if (m instanceof Put && (mostRecentIndexRowKeys.isEmpty() || mostRecentIndexRowKeys.contains(m.getRow()))) {
@@ -782,7 +823,9 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
if (verifyType == IndexTool.IndexVerifyType.ONLY) {
repairActualMutationList(actualMutationList, expectedMutationList);
}
+ // actualMutationList can be empty after returning from this function
cleanUpActualMutationList(actualMutationList);
+
long currentTime = EnvironmentEdgeManager.currentTimeMillis();
int actualIndex = 0;
int expectedIndex = 0;
@@ -887,6 +930,11 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
logMismatch(expected, actual, expectedIndex, verificationPhaseResult, isBeforeRebuild);
}
else {
+ if (expected == null) {
+ // Happens when the actualMutationList becomes empty after returning from
+ // the cleanUpActualMutationList function.
+ expected = expectedMutationList.get(0);
+ }
byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(indexRowKey), viewConstants);
String errorMsg = String.format("Not matching index row. expectedIndex=%d. expectedMutationSize=%d. actualIndex=%d. actualMutationSize=%d. expectedType=%s. actualType=%s",
expectedIndex, expectedSize, actualIndex, actualSize, expected.getClass().getName(), (actualIndex < actualSize ? actual.getClass().getName() : "null"));
@@ -913,10 +961,11 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
if (expectedMutationList != null) {
if (!verifySingleIndexRow(entry.getKey(), entry.getValue(), expectedMutationList, mostRecentIndexRowKeys,
indexRowsToBeDeleted, verificationPhaseResult, isBeforeRebuild)) {
- invalidIndexRows.put(indexRowKey, actualIndexMutationMap.get(indexRowKey));
+ invalidIndexRows.put(indexRowKey, expectedMutationList);
}
expectedIndexMutationMap.remove(indexRowKey);
} else {
+ logExtraIndexRowAndUpdateCounters(entry.getValue(), verificationPhaseResult, isBeforeRebuild);
indexRowsToBeDeleted.add(indexMaintainer.buildRowDeleteMutation(indexRowKey,
IndexMaintainer.DeleteType.ALL_VERSIONS, getTimestamp(entry.getValue().get(0))));
}
@@ -969,6 +1018,7 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
logToIndexToolOutputTable(dataKey, indexKey, getTimestamp(mutation), 0, errorMsg,
isBeforeRebuild, errorType);
}
+
// Leave the invalid and missing rows in indexMutationMap
expectedIndexMutationMap.putAll(invalidIndexRows);
}
@@ -1014,7 +1064,11 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
verificationResult.setRebuiltIndexRowCount(verificationResult.getRebuiltIndexRowCount() + indexMutationMap.size());
}
} catch (Throwable t) {
- ServerUtil.throwIOException(indexHTable.getName().toString(), t);
+ if (indexHTable != null) {
+ ServerUtil.throwIOException(indexHTable.getName().toString(), t);
+ } else {
+ ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
+ }
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
index 39295b3..5e69925 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
@@ -91,6 +91,11 @@ public class IndexRepairRegionScanner extends GlobalIndexRegionScanner {
}
}
+ @Override
+ public byte[] getDataTableName() {
+ return dataHTable.getName().toBytes();
+ }
+
public void prepareExpectedIndexMutations(Result dataRow, Map<byte[], List<Mutation>> expectedIndexMutationMap) throws IOException {
Put put = null;
Delete del = null;
@@ -178,6 +183,28 @@ public class IndexRepairRegionScanner extends GlobalIndexRegionScanner {
return actualIndexMutationMap;
}
+ private Map<byte[], List<Mutation>> populateActualIndexMutationMap() throws IOException {
+ Map<byte[], List<Mutation>> actualIndexMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ Scan indexScan = new Scan();
+ indexScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax());
+ indexScan.setRaw(true);
+ indexScan.setMaxVersions();
+ indexScan.setCacheBlocks(false);
+ try (RegionScanner regionScanner = region.getScanner(indexScan)) {
+ do {
+ ungroupedAggregateRegionObserver.checkForRegionClosing();
+ List<Cell> row = new ArrayList<Cell>();
+ hasMore = regionScanner.nextRaw(row);
+ if (!row.isEmpty()) {
+ populateIndexMutationFromIndexRow(row, actualIndexMutationMap);
+ }
+ } while (hasMore);
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
+ }
+ return actualIndexMutationMap;
+ }
+
private void repairAndOrVerifyIndexRows(Set<byte[]> dataRowKeys,
Map<byte[], List<Mutation>> actualIndexMutationMap,
IndexToolVerificationResult verificationResult) throws IOException {
@@ -188,7 +215,7 @@ public class IndexRepairRegionScanner extends GlobalIndexRegionScanner {
return;
}
if (verifyType == IndexTool.IndexVerifyType.ONLY) {
- verifyIndexRows(actualIndexMutationMap, expectedIndexMutationMap, Collections.EMPTY_SET, Collections.EMPTY_LIST, verificationResult.getBefore(), true);
+ verifyIndexRows(actualIndexMutationMap, expectedIndexMutationMap, Collections.EMPTY_SET, indexRowsToBeDeleted, verificationResult.getBefore(), true);
return;
}
if (verifyType == IndexTool.IndexVerifyType.BEFORE) {
@@ -200,11 +227,12 @@ public class IndexRepairRegionScanner extends GlobalIndexRegionScanner {
}
if (verifyType == IndexTool.IndexVerifyType.AFTER) {
repairIndexRows(expectedIndexMutationMap, Collections.EMPTY_LIST, verificationResult);
- verifyIndexRows(actualIndexMutationMap, expectedIndexMutationMap, Collections.EMPTY_SET, Collections.EMPTY_LIST, verificationResult.getAfter(), false);
+ actualIndexMutationMap = populateActualIndexMutationMap();
+ verifyIndexRows(actualIndexMutationMap, expectedIndexMutationMap, Collections.EMPTY_SET, indexRowsToBeDeleted, verificationResult.getAfter(), false);
return;
}
if (verifyType == IndexTool.IndexVerifyType.BOTH) {
- verifyIndexRows(actualIndexMutationMap,expectedIndexMutationMap, Collections.EMPTY_SET, indexRowsToBeDeleted, verificationResult.getBefore(), true);
+ verifyIndexRows(actualIndexMutationMap, expectedIndexMutationMap, Collections.EMPTY_SET, indexRowsToBeDeleted, verificationResult.getBefore(), true);
if (!expectedIndexMutationMap.isEmpty() || !indexRowsToBeDeleted.isEmpty()) {
repairIndexRows(expectedIndexMutationMap, indexRowsToBeDeleted, verificationResult);
}
@@ -215,6 +243,7 @@ public class IndexRepairRegionScanner extends GlobalIndexRegionScanner {
}
}
+
private void addRepairAndOrVerifyTask(TaskBatch<Boolean> tasks,
final Set<byte[]> dataRowKeys,
final Map<byte[], List<Mutation>> actualIndexMutationMap,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java
index 2e02263..5919e8f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexToolVerificationResult.java
@@ -30,6 +30,8 @@ import static org.apache.phoenix.mapreduce.index.IndexVerificationResultReposito
import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.AFTER_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES;
@@ -41,6 +43,8 @@ import static org.apache.phoenix.mapreduce.index.IndexVerificationResultReposito
import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT_BYTES;
+import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.REBUILT_INDEX_ROW_COUNT_BYTES;
import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.RESULT_TABLE_COLUMN_FAMILY;
import static org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository.SCANNED_DATA_ROW_COUNT_BYTES;
@@ -123,6 +127,8 @@ public class IndexToolVerificationResult {
private long unverifiedIndexRowCount = 0;
private long oldIndexRowCount = 0;
private long unknownIndexRowCount = 0;
+ private long extraVerifiedIndexRowCount = 0;
+ private long extraUnverifiedIndexRowCount = 0;
public void add(PhaseResult phaseResult) {
setBeyondMaxLookBackMissingIndexRowCount(getBeyondMaxLookBackMissingIndexRowCount() +
@@ -138,16 +144,21 @@ public class IndexToolVerificationResult {
setUnverifiedIndexRowCount(getUnverifiedIndexRowCount() + phaseResult.getUnverifiedIndexRowCount());
setUnknownIndexRowCount(getUnknownIndexRowCount() + phaseResult.getUnknownIndexRowCount());
setOldIndexRowCount(getOldIndexRowCount() + phaseResult.getOldIndexRowCount());
+ setExtraVerifiedIndexRowCount(getExtraVerifiedIndexRowCount() +
+ phaseResult.getExtraVerifiedIndexRowCount());
+ setExtraUnverifiedIndexRowCount(getExtraUnverifiedIndexRowCount() +
+ phaseResult.getExtraUnverifiedIndexRowCount());
}
public PhaseResult() {
}
public PhaseResult(long validIndexRowCount, long expiredIndexRowCount,
- long missingIndexRowCount, long invalidIndexRowCount,
- long beyondMaxLookBackMissingIndexRowCount,
- long beyondMaxLookBackInvalidIndexRowCount,
- long indexHasExtraCellsCount, long indexHasMissingCellsCount) {
+ long missingIndexRowCount, long invalidIndexRowCount,
+ long beyondMaxLookBackMissingIndexRowCount,
+ long beyondMaxLookBackInvalidIndexRowCount,
+ long indexHasExtraCellsCount, long indexHasMissingCellsCount,
+ long extraVerifiedIndexRowCount, long extraUnverifiedIndexRowCount) {
this.setValidIndexRowCount(validIndexRowCount);
this.setExpiredIndexRowCount(expiredIndexRowCount);
this.setMissingIndexRowCount(missingIndexRowCount);
@@ -156,6 +167,8 @@ public class IndexToolVerificationResult {
this.setBeyondMaxLookBackMissingIndexRowCount(beyondMaxLookBackMissingIndexRowCount);
this.setIndexHasExtraCellsCount(indexHasExtraCellsCount);
this.setIndexHasMissingCellsCount(indexHasMissingCellsCount);
+ this.setExtraVerifiedIndexRowCount(extraVerifiedIndexRowCount);
+ this.setExtraUnverifiedIndexRowCount(extraUnverifiedIndexRowCount);
}
@@ -172,6 +185,10 @@ public class IndexToolVerificationResult {
return indexHasMissingCellsCount;
}
+ public long getTotalExtraIndexRowsCount() {
+ return getExtraVerifiedIndexRowCount() + getExtraUnverifiedIndexRowCount() ;
+ }
+
@Override
public String toString() {
return "PhaseResult{" +
@@ -184,6 +201,11 @@ public class IndexToolVerificationResult {
", beyondMaxLookBackInvalidIndexRowCount=" + getBeyondMaxLookBackInvalidIndexRowCount() +
", extraCellsOnIndexCount=" + indexHasExtraCellsCount +
", missingCellsOnIndexCount=" + indexHasMissingCellsCount +
+ ", unverifiedIndexRowCount=" + unverifiedIndexRowCount +
+ ", oldIndexRowCount=" + oldIndexRowCount +
+ ", unknownIndexRowCount=" + unknownIndexRowCount +
+ ", extraVerifiedIndexRowCount=" + extraVerifiedIndexRowCount +
+ ", extraUnverifiedIndexRowCount=" + extraUnverifiedIndexRowCount +
'}';
}
@@ -201,10 +223,14 @@ public class IndexToolVerificationResult {
&& this.validIndexRowCount == pr.validIndexRowCount
&& this.invalidIndexRowCount == pr.invalidIndexRowCount
&& this.missingIndexRowCount == pr.missingIndexRowCount
- && this.getBeyondMaxLookBackInvalidIndexRowCount() == pr.getBeyondMaxLookBackInvalidIndexRowCount()
- && this.getBeyondMaxLookBackMissingIndexRowCount() == pr.getBeyondMaxLookBackMissingIndexRowCount()
+ && this.beyondMaxLookBackInvalidIndexRowCount == pr.beyondMaxLookBackInvalidIndexRowCount
+ && this.beyondMaxLookBackMissingIndexRowCount== pr.beyondMaxLookBackMissingIndexRowCount
&& this.indexHasMissingCellsCount == pr.indexHasMissingCellsCount
- && this.indexHasExtraCellsCount == pr.indexHasExtraCellsCount;
+ && this.indexHasExtraCellsCount == pr.indexHasExtraCellsCount
+ && this.oldIndexRowCount == pr.oldIndexRowCount
+ && this.unknownIndexRowCount == pr.unknownIndexRowCount
+ && this.extraVerifiedIndexRowCount == pr.extraVerifiedIndexRowCount
+ && this.extraUnverifiedIndexRowCount == pr.extraUnverifiedIndexRowCount;
}
@Override
@@ -221,6 +247,8 @@ public class IndexToolVerificationResult {
result = 31 * result + getUnverifiedIndexRowCount();
result = 31 * result + getOldIndexRowCount();
result = 31 * result + getUnknownIndexRowCount();
+ result = 31 * result + getExtraVerifiedIndexRowCount();
+ result = 31 * result + getExtraUnverifiedIndexRowCount();
return (int) result;
}
@@ -303,6 +331,18 @@ public class IndexToolVerificationResult {
public void setUnknownIndexRowCount(long unknownIndexRowCount) {
this.unknownIndexRowCount = unknownIndexRowCount;
}
+
+ public long getExtraVerifiedIndexRowCount() { return extraVerifiedIndexRowCount; }
+
+ public void setExtraVerifiedIndexRowCount(long extraVerifiedIndexRowCount) {
+ this.extraVerifiedIndexRowCount = extraVerifiedIndexRowCount;
+ }
+
+ public long getExtraUnverifiedIndexRowCount() { return extraUnverifiedIndexRowCount; }
+
+ public void setExtraUnverifiedIndexRowCount(long extraUnverifiedIndexRowCount) {
+ this.extraUnverifiedIndexRowCount = extraUnverifiedIndexRowCount;
+ }
}
private long scannedDataRowCount = 0;
@@ -372,6 +412,10 @@ public class IndexToolVerificationResult {
public long getBeforeIndexHasExtraCellsCount() {return getBefore().getIndexHasExtraCellsCount(); }
+ public long getBeforeRepairExtraVerifiedIndexRowCount() { return getBefore().getExtraVerifiedIndexRowCount(); }
+
+ public long getBeforeRepairExtraUnverifiedIndexRowCount() { return getBefore().getExtraUnverifiedIndexRowCount(); }
+
public long getAfterRebuildValidIndexRowCount() {
return getAfter().getValidIndexRowCount();
}
@@ -400,6 +444,10 @@ public class IndexToolVerificationResult {
public long getAfterIndexHasExtraCellsCount() { return getAfter().getIndexHasExtraCellsCount(); }
+ public long getAfterRepairExtraVerifiedIndexRowCount() { return getAfter().getExtraVerifiedIndexRowCount(); }
+
+ public long getAfterRepairExtraUnverifiedIndexRowCount() { return getAfter().getExtraUnverifiedIndexRowCount(); }
+
private void addScannedDataRowCount(long count) {
this.setScannedDataRowCount(this.getScannedDataRowCount() + count);
}
@@ -451,6 +499,14 @@ public class IndexToolVerificationResult {
getBefore().setUnknownIndexRowCount(getBefore().getUnknownIndexRowCount() + count);
}
+ public void addBeforeRepairExtraVerifiedIndexRowCount(long count) {
+ getBefore().setExtraVerifiedIndexRowCount(getBefore().getExtraVerifiedIndexRowCount() + count);
+ }
+
+ public void addBeforeRepairExtraUnverifiedIndexRowCount(long count) {
+ getBefore().setExtraUnverifiedIndexRowCount(getBefore().getExtraUnverifiedIndexRowCount() + count);
+ }
+
private void addAfterRebuildValidIndexRowCount(long count) {
getAfter().setValidIndexRowCount(getAfter().getValidIndexRowCount() + count);
}
@@ -483,6 +539,14 @@ public class IndexToolVerificationResult {
getAfter().setIndexHasExtraCellsCount(getAfter().getIndexHasExtraCellsCount() + count);
}
+ public void addAfterRepairExtraVerifiedIndexRowCount(long count) {
+ getAfter().setExtraVerifiedIndexRowCount(getAfter().getExtraVerifiedIndexRowCount() + count);
+ }
+
+ public void addAfterRepairExtraUnverifiedIndexRowCount(long count) {
+ getAfter().setExtraUnverifiedIndexRowCount(getAfter().getExtraUnverifiedIndexRowCount() + count);
+ }
+
private static boolean isAfterRebuildInvalidIndexRowCount(Cell cell) {
if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_BYTES, 0,
@@ -525,6 +589,10 @@ public class IndexToolVerificationResult {
addBeforeOldIndexRowCount(getValue(cell));
} else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT_BYTES)) {
addBeforeUnknownIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT_BYTES)) {
+ addBeforeRepairExtraVerifiedIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT_BYTES)) {
+ addBeforeRepairExtraUnverifiedIndexRowCount(getValue(cell));
} else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES)) {
addAfterRebuildValidIndexRowCount(getValue(cell));
} else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT_BYTES)) {
@@ -541,13 +609,18 @@ public class IndexToolVerificationResult {
addAfterIndexHasExtraCellsCount(getValue(cell));
} else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS_BYTES)) {
addAfterIndexHasMissingCellsCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT_BYTES)) {
+ addAfterRepairExtraVerifiedIndexRowCount(getValue(cell));
+ } else if (CellUtil.matchingColumn(cell, RESULT_TABLE_COLUMN_FAMILY, AFTER_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT_BYTES)) {
+ addAfterRepairExtraUnverifiedIndexRowCount(getValue(cell));
}
}
public boolean isVerificationFailed() {
//we don't want to count max look back failures alone as failing an index rebuild job
//so we omit them from the below calculation.
- if (getAfter().getInvalidIndexRowCount() + getAfter().getMissingIndexRowCount() > 0) {
+ if (getAfter().getInvalidIndexRowCount() + getAfter().getMissingIndexRowCount()
+ + getAfter().getExtraVerifiedIndexRowCount() > 0) {
return true;
}
return false;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
index 056daf6..5ade9b5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
@@ -20,23 +20,36 @@ package org.apache.phoenix.mapreduce;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
+import java.util.Collections;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
-import org.apache.phoenix.compile.*;
+import org.apache.phoenix.compile.MutationPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.ServerBuildIndexCompiler;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.schema.*;
-import org.apache.phoenix.util.*;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ScanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +60,7 @@ import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getDisa
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolDataTableName;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolIndexTableName;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolLastVerifyTime;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolSourceTable;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexVerifyType;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolStartTime;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.setCurrentScnValue;
@@ -68,6 +82,46 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph
public PhoenixServerBuildIndexInputFormat() {
}
+ private interface QueryPlanBuilder {
+ QueryPlan getQueryPlan(PhoenixConnection phoenixConnection, String dataTableFullName,
+ String indexTableFullName) throws SQLException;
+ }
+
+ private class DataTableQueryPlanBuilder implements QueryPlanBuilder {
+ @Override
+ public QueryPlan getQueryPlan(PhoenixConnection phoenixConnection, String dataTableFullName,
+ String indexTableFullName) throws SQLException {
+ PTable indexTable = PhoenixRuntime.getTableNoCache(phoenixConnection, indexTableFullName);
+ ServerBuildIndexCompiler compiler = new ServerBuildIndexCompiler(phoenixConnection, dataTableFullName);
+ MutationPlan plan = compiler.compile(indexTable);
+ return plan.getQueryPlan();
+ }
+ }
+
+ private class IndexTableQueryPlanBuilder implements QueryPlanBuilder {
+ @Override
+ public QueryPlan getQueryPlan(PhoenixConnection phoenixConnection, String dataTableFullName,
+ String indexTableFullName) throws SQLException {
+ QueryPlan plan;
+ try (final PhoenixStatement statement = new PhoenixStatement(phoenixConnection)) {
+ String query = "SELECT count(*) FROM " + indexTableFullName;
+ plan = statement.compileQuery(query);
+ TableRef tableRef = plan.getTableRef();
+ Scan scan = plan.getContext().getScan();
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ PTable pIndexTable = tableRef.getTable();
+ PTable pDataTable = PhoenixRuntime.getTable(phoenixConnection, dataTableFullName);
+ IndexMaintainer.serialize(pDataTable, ptr, Collections.singletonList(pIndexTable), phoenixConnection);
+ scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr));
+ scan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
+ ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION);
+ }
+ return plan;
+ }
+ }
+
+ private QueryPlanBuilder queryPlanBuilder;
+
@Override
protected QueryPlan getQueryPlan(final JobContext context, final Configuration configuration)
throws IOException {
@@ -90,6 +144,9 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph
}
String dataTableFullName = getIndexToolDataTableName(configuration);
String indexTableFullName = getIndexToolIndexTableName(configuration);
+ SourceTable sourceTable = getIndexToolSourceTable(configuration);
+ queryPlanBuilder = sourceTable.equals(SourceTable.DATA_TABLE_SOURCE) ?
+ new DataTableQueryPlanBuilder() : new IndexTableQueryPlanBuilder();
try (final Connection connection = ConnectionUtil.getInputConnection(configuration, overridingProps)) {
PhoenixConnection phoenixConnection = connection.unwrap(PhoenixConnection.class);
@@ -97,11 +154,10 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph
setCurrentScnValue(configuration, scn);
Long startTime = (startTimeValue == null) ? 0L : Long.valueOf(startTimeValue);
- PTable indexTable = PhoenixRuntime.getTableNoCache(phoenixConnection, indexTableFullName);
- ServerBuildIndexCompiler compiler =
- new ServerBuildIndexCompiler(phoenixConnection, dataTableFullName);
- MutationPlan plan = compiler.compile(indexTable);
- Scan scan = plan.getContext().getScan();
+
+ queryPlan = queryPlanBuilder.getQueryPlan(phoenixConnection, dataTableFullName, indexTableFullName);
+ Scan scan = queryPlan.getContext().getScan();
+
Long lastVerifyTimeValue = lastVerifyTime == null ? 0L : Long.valueOf(lastVerifyTime);
try {
scan.setTimeRange(startTime, scn);
@@ -126,7 +182,6 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph
} catch (IOException e) {
throw new SQLException(e);
}
- queryPlan = plan.getQueryPlan();
// since we can't set a scn on connections with txn set TX_SCN attribute so that the max time range is set by BaseScannerRegionObserver
if (txnScnValue != null) {
scan.setAttribute(BaseScannerRegionObserver.TX_SCN, Bytes.toBytes(Long.valueOf(txnScnValue)));
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index e112f76..75e352d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -51,7 +51,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -85,6 +84,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
import org.apache.phoenix.mapreduce.PhoenixServerBuildIndexInputFormat;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
import org.apache.phoenix.mapreduce.index.SourceTargetColumnNames.DataSourceColNames;
import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
@@ -200,6 +200,7 @@ public class IndexTool extends Configured implements Tool {
private boolean isPartialBuild, isForeground;
private IndexVerifyType indexVerifyType = IndexVerifyType.NONE;
private IndexDisableLoggingType disableLoggingType = IndexDisableLoggingType.NONE;
+ private SourceTable sourceTable = SourceTable.DATA_TABLE_SOURCE;
private String qDataTable;
private String qIndexTable;
private boolean useSnapshot;
@@ -285,6 +286,12 @@ public class IndexTool extends Configured implements Tool {
, "Disable logging of failed verification rows for BEFORE, " +
"AFTER, or BOTH verify jobs");
+ private static final Option USE_INDEX_TABLE_AS_SOURCE_OPTION =
+ new Option("fi", "from-index", false,
+ "To verify every row in the index table has a corresponding row in the data table. "
+ + "Only supported for global indexes. If this option is used with -v AFTER, these "
+ + "extra rows will be identified but not repaired.");
+
public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s.%s_INDX_%s";
public static final String INVALID_TIME_RANGE_EXCEPTION_MESSAGE = "startTime is greater than "
@@ -294,7 +301,6 @@ public class IndexTool extends Configured implements Tool {
public static final String FEATURE_NOT_APPLICABLE = "start-time/end-time and retry verify feature are only "
+ "applicable for local or non-transactional global indexes";
-
public static final String RETRY_VERIFY_NOT_APPLICABLE = "retry verify feature accepts "
+ "non-zero ts set in the past and ts must be present in PHOENIX_INDEX_TOOL_RESULT table";
@@ -323,6 +329,7 @@ public class IndexTool extends Configured implements Tool {
options.addOption(END_TIME_OPTION);
options.addOption(RETRY_VERIFY_OPTION);
options.addOption(DISABLE_LOGGING_OPTION);
+ options.addOption(USE_INDEX_TABLE_AS_SOURCE_OPTION);
return options;
}
@@ -445,6 +452,8 @@ public class IndexTool extends Configured implements Tool {
return disableLoggingType;
}
+ public IndexScrutinyTool.SourceTable getSourceTable() { return sourceTable; }
+
class JobFactory {
Connection connection;
Configuration configuration;
@@ -692,6 +701,7 @@ public class IndexTool extends Configured implements Tool {
PhoenixConfigurationUtil.setIndexToolDataTableName(configuration, qDataTable);
PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, qIndexTable);
+ PhoenixConfigurationUtil.setIndexToolSourceTable(configuration, sourceTable);
if (startTime != null) {
PhoenixConfigurationUtil.setIndexToolStartTime(configuration, startTime);
}
@@ -849,6 +859,7 @@ public class IndexTool extends Configured implements Tool {
boolean retryVerify = cmdLine.hasOption(RETRY_VERIFY_OPTION.getOpt());
boolean verify = cmdLine.hasOption(VERIFY_OPTION.getOpt());
boolean disableLogging = cmdLine.hasOption(DISABLE_LOGGING_OPTION.getOpt());
+ boolean useIndexTableAsSource = cmdLine.hasOption(USE_INDEX_TABLE_AS_SOURCE_OPTION.getOpt());
if (useTenantId) {
tenantId = cmdLine.getOptionValue(TENANT_ID_OPTION.getOpt());
@@ -875,6 +886,11 @@ public class IndexTool extends Configured implements Tool {
cmdLine.getOptionValue(DISABLE_LOGGING_OPTION.getOpt()));
}
}
+
+ if (useIndexTableAsSource) {
+ sourceTable = SourceTable.INDEX_TABLE_SOURCE;
+ }
+
schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java
index c7a2753..34c46de 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationOutputRepository.java
@@ -92,6 +92,7 @@ public class IndexVerificationOutputRepository implements AutoCloseable {
public enum IndexVerificationErrorType {
INVALID_ROW,
MISSING_ROW,
+ EXTRA_ROW,
EXTRA_CELLS,
BEYOND_MAX_LOOKBACK_INVALID,
BEYOND_MAX_LOOKBACK_MISSING,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java
index 963b593..de25606 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexVerificationResultRepository.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.compat.hbase.CompatUtil;
import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.hbase.index.table.HTableFactory;
@@ -122,6 +121,20 @@ public class IndexVerificationResultRepository implements AutoCloseable {
public static String AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS = "AfterRebuildInvalidIndexRowCountCozMissingCells";
public final static byte[] AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS_BYTES = Bytes.toBytes(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS);
+ public final static String BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT = "BeforeRepairExtraVerifiedIndexRowCount";
+ public final static byte[] BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT_BYTES =
+ Bytes.toBytes(BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT);
+ public final static String BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT = "BeforeRepairExtraUnverifiedIndexRowCount";
+ public final static byte[] BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT_BYTES =
+ Bytes.toBytes(BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT);
+
+ public final static String AFTER_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT = "AfterRepairExtraVerifiedIndexRowCount";
+ public final static byte[] AFTER_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT_BYTES =
+ Bytes.toBytes(AFTER_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT);
+ public final static String AFTER_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT = "AfterRepairExtraUnverifiedIndexRowCount";
+ public final static byte[] AFTER_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT_BYTES =
+ Bytes.toBytes(AFTER_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT);
+
/***
* Only usable for read / create methods. To write use setResultTable and setIndexTable first
*/
@@ -244,6 +257,10 @@ public class IndexVerificationResultRepository implements AutoCloseable {
Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildOldIndexRowCount())));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT_BYTES,
Bytes.toBytes(Long.toString(verificationResult.getBeforeRebuildUnknownIndexRowCount())));
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT_BYTES,
+ Bytes.toBytes(Long.toString(verificationResult.getBeforeRepairExtraVerifiedIndexRowCount())));
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT_BYTES,
+ Bytes.toBytes(Long.toString(verificationResult.getBeforeRepairExtraUnverifiedIndexRowCount())));
}
if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.BOTH) {
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_VALID_INDEX_ROW_COUNT_BYTES,
@@ -262,6 +279,10 @@ public class IndexVerificationResultRepository implements AutoCloseable {
Bytes.toBytes(Long.toString(verificationResult.getAfterIndexHasExtraCellsCount())));
put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS_BYTES,
Bytes.toBytes(Long.toString(verificationResult.getAfterIndexHasMissingCellsCount())));
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT_BYTES,
+ Bytes.toBytes(Long.toString(verificationResult.getAfterRepairExtraVerifiedIndexRowCount())));
+ put.addColumn(RESULT_TABLE_COLUMN_FAMILY, AFTER_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT_BYTES,
+ Bytes.toBytes(Long.toString(verificationResult.getAfterRepairExtraUnverifiedIndexRowCount())));
}
resultTable.put(put);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
index 8cee869..953e61f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectReducer.java
@@ -94,6 +94,10 @@ public class PhoenixIndexImportDirectReducer extends
setValue(verificationResult.getBeforeRebuildOldIndexRowCount());
context.getCounter(PhoenixIndexToolJobCounters.BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT).
setValue(verificationResult.getBeforeRebuildUnknownIndexRowCount());
+ context.getCounter(PhoenixIndexToolJobCounters.BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT).
+ setValue(verificationResult.getBeforeRepairExtraVerifiedIndexRowCount());
+ context.getCounter(PhoenixIndexToolJobCounters.BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT).
+ setValue(verificationResult.getBeforeRepairExtraUnverifiedIndexRowCount());
}
if (verifyType == IndexTool.IndexVerifyType.BOTH || verifyType == IndexTool.IndexVerifyType.AFTER) {
context.getCounter(PhoenixIndexToolJobCounters.AFTER_REBUILD_VALID_INDEX_ROW_COUNT).
@@ -112,6 +116,10 @@ public class PhoenixIndexImportDirectReducer extends
setValue(verificationResult.getAfterIndexHasExtraCellsCount());
context.getCounter(PhoenixIndexToolJobCounters.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS).
setValue(verificationResult.getAfterIndexHasMissingCellsCount());
+ context.getCounter(PhoenixIndexToolJobCounters.AFTER_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT).
+ setValue(verificationResult.getAfterRepairExtraVerifiedIndexRowCount());
+ context.getCounter(PhoenixIndexToolJobCounters.AFTER_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT).
+ setValue(verificationResult.getAfterRepairExtraUnverifiedIndexRowCount());
}
if (verificationResult.isVerificationFailed()) {
throw new IOException("Index verification failed! " + verificationResult);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java
index c6c6ec7..8016bce 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java
@@ -35,6 +35,8 @@ public enum PhoenixIndexToolJobCounters {
BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT,
BEFORE_REBUILD_OLD_INDEX_ROW_COUNT,
BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT,
+ BEFORE_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT,
+ BEFORE_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT,
AFTER_REBUILD_VALID_INDEX_ROW_COUNT,
AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT,
AFTER_REBUILD_MISSING_INDEX_ROW_COUNT,
@@ -42,5 +44,7 @@ public enum PhoenixIndexToolJobCounters {
AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT,
AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT,
AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_EXTRA_CELLS,
- AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS
+ AFTER_REBUILD_INVALID_INDEX_ROW_COUNT_COZ_MISSING_CELLS,
+ AFTER_REPAIR_EXTRA_VERIFIED_INDEX_ROW_COUNT,
+ AFTER_REPAIR_EXTRA_UNVERIFIED_INDEX_ROW_COUNT
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 792b621..1d8fa57 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.FormatToBytesWritableMapper;
import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
import org.apache.phoenix.mapreduce.index.IndexTool;
@@ -125,6 +126,8 @@ public final class PhoenixConfigurationUtil {
public static final String INDEX_TOOL_INDEX_TABLE_NAME = "phoenix.mr.index_tool.index.table.name";
+ public static final String INDEX_TOOL_SOURCE_TABLE = "phoenix.mr.index_tool.source.table";
+
public static final String SCRUTINY_SOURCE_TABLE = "phoenix.mr.scrutiny.source.table";
public static final String SCRUTINY_BATCH_SIZE = "phoenix.mr.scrutiny.batch.size";
@@ -703,6 +706,19 @@ public final class PhoenixConfigurationUtil {
return configuration.get(INDEX_TOOL_INDEX_TABLE_NAME);
}
+ public static void setIndexToolSourceTable(Configuration configuration,
+ IndexScrutinyTool.SourceTable sourceTable) {
+ Preconditions.checkNotNull(configuration);
+ Preconditions.checkNotNull(sourceTable);
+ configuration.set(INDEX_TOOL_SOURCE_TABLE, sourceTable.name());
+ }
+
+ public static IndexScrutinyTool.SourceTable getIndexToolSourceTable(Configuration configuration) {
+ Preconditions.checkNotNull(configuration);
+ return IndexScrutinyTool.SourceTable.valueOf(configuration.get(INDEX_TOOL_SOURCE_TABLE,
+ IndexScrutinyTool.SourceTable.DATA_TABLE_SOURCE.name()));
+ }
+
public static void setScrutinySourceTable(Configuration configuration,
SourceTable sourceTable) {
Preconditions.checkNotNull(configuration);
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java
index 87215d0..916552b 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexToolTest.java
@@ -19,10 +19,12 @@ package org.apache.phoenix.index;
import org.apache.commons.cli.CommandLine;
import org.apache.phoenix.end2end.IndexToolIT;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.IndexScrutiny;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@ -34,9 +36,8 @@ import org.mockito.MockitoAnnotations;
import static org.apache.phoenix.mapreduce.index.IndexTool.FEATURE_NOT_APPLICABLE;
import static org.apache.phoenix.mapreduce.index.IndexTool.INVALID_TIME_RANGE_EXCEPTION_MESSAGE;
-import static org.junit.Assert.assertEquals;
import static org.apache.phoenix.mapreduce.index.IndexTool.RETRY_VERIFY_NOT_APPLICABLE;
-import static org.mockito.Mockito.mock;
+import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;
public class IndexToolTest extends BaseTest {
@@ -335,4 +336,35 @@ public class IndexToolTest extends BaseTest {
CommandLine cmdLine = it.parseOptions(args);
}
+ @Test
+ public void testIndexToolDefaultSource() throws Exception {
+ Long startTime = 1L;
+ Long endTime = 10L;
+ String [] args =
+ IndexToolIT.getArgValues(true, true, schema,
+ dataTable, indexTable, tenantId, IndexTool.IndexVerifyType.NONE,
+ startTime , endTime);
+ CommandLine cmdLine = it.parseOptions(args);
+ it.populateIndexToolAttributes(cmdLine);
+ assertEquals(IndexScrutinyTool.SourceTable.DATA_TABLE_SOURCE, it.getSourceTable());
+ }
+
+ @Test
+ public void testIndexToolFromIndexSource() throws Exception {
+ verifyFromIndexOption(IndexTool.IndexVerifyType.ONLY);
+ verifyFromIndexOption(IndexTool.IndexVerifyType.BEFORE);
+ }
+
+ private void verifyFromIndexOption(IndexTool.IndexVerifyType verifyType) throws Exception {
+ Long startTime = 1L;
+ Long endTime = 10L;
+ String[] args =
+ IndexToolIT.getArgValues(true, true, schema,
+ dataTable, indexTable, tenantId, verifyType,
+ startTime, endTime, IndexTool.IndexDisableLoggingType.BEFORE, null, true);
+ CommandLine cmdLine = it.parseOptions(args);
+ it.populateIndexToolAttributes(cmdLine);
+ assertEquals(IndexScrutinyTool.SourceTable.INDEX_TABLE_SOURCE, it.getSourceTable());
+ }
+
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
index b7eefc6..bf477d2 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
@@ -357,7 +357,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest {
@Test
public void testVerifySingleIndexRow_expiredIndexRowCount_nonZero() throws IOException {
IndexToolVerificationResult.PhaseResult
- expectedPR = new IndexToolVerificationResult.PhaseResult(0, 1, 0, 0, 0, 0,0,0);
+ expectedPR = new IndexToolVerificationResult.PhaseResult(0, 1, 0, 0, 0, 0, 0, 0, 0, 0);
try {
for (Map.Entry<byte[], List<Mutation>>
entry : indexKeyToMutationMap.entrySet()) {
@@ -510,7 +510,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest {
assertTrue(rebuildScanner.verifySingleIndexRow(indexRow.getRow(), actualMutations,
indexKeyToMutationMap.get(indexRow.getRow()), mostRecentIndexRowKeys, Collections.EMPTY_LIST, actualPR, false));
// validIndexRowCount = 1
- IndexToolVerificationResult.PhaseResult expectedPR = new IndexToolVerificationResult.PhaseResult(1, 0, 0, 0, 0, 0, 0, 0);
+ IndexToolVerificationResult.PhaseResult expectedPR = new IndexToolVerificationResult.PhaseResult(1, 0, 0, 0, 0, 0, 0, 0, 0, 0);
assertTrue(actualPR.equals(expectedPR));
}
@@ -566,7 +566,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest {
// Report this validation as a failure
assertFalse(rebuildScanner.verifySingleIndexRow(indexRow.getRow(), actualMutations, expectedMutations, mostRecentIndexRowKeys, new ArrayList<Mutation>(), actualPR, true));
// beyondMaxLookBackInvalidIndexRowCount = 1
- IndexToolVerificationResult.PhaseResult expectedPR = new IndexToolVerificationResult.PhaseResult(0, 0, 0, 0, 0, 1, 0, 0);
+ IndexToolVerificationResult.PhaseResult expectedPR = new IndexToolVerificationResult.PhaseResult(0, 0, 0, 0, 0, 1, 0, 0, 0, 0);
assertTrue(actualPR.equals(expectedPR));
}
@@ -580,11 +580,11 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest {
}
private IndexToolVerificationResult.PhaseResult getValidPhaseResult() {
- return new IndexToolVerificationResult.PhaseResult(1, 0, 0, 0, 0, 0, 0, 0);
+ return new IndexToolVerificationResult.PhaseResult(1, 0, 0, 0, 0, 0, 0, 0, 0, 0);
}
private IndexToolVerificationResult.PhaseResult getInvalidPhaseResult() {
- return new IndexToolVerificationResult.PhaseResult(0, 0, 0, 1, 0, 0, 0, 0);
+ return new IndexToolVerificationResult.PhaseResult(0, 0, 0, 1, 0, 0, 0, 0, 0, 0);
}
private void initializeLocalMockitoSetup(Map.Entry<byte[], List<Mutation>> entry,
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
index 840e9d5..c316fa4 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java
@@ -25,6 +25,7 @@ import java.sql.DriverManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.MRJobType;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
import org.apache.phoenix.query.BaseConnectionlessQueryTest;
@@ -312,4 +313,21 @@ public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest {
Long.parseLong(PhoenixConfigurationUtil.getIndexToolLastVerifyTime(configuration)));
}
+
+ @Test
+ public void testIndexToolSourceConfig() {
+ final Configuration conf = new Configuration();
+
+ // by default source is data table
+ SourceTable sourceTable = PhoenixConfigurationUtil.getIndexToolSourceTable(conf);
+ Assert.assertEquals(sourceTable, SourceTable.DATA_TABLE_SOURCE);
+
+ PhoenixConfigurationUtil.setIndexToolSourceTable(conf, SourceTable.INDEX_TABLE_SOURCE);
+ sourceTable = PhoenixConfigurationUtil.getIndexToolSourceTable(conf);
+ Assert.assertEquals(sourceTable, SourceTable.INDEX_TABLE_SOURCE);
+
+ PhoenixConfigurationUtil.setIndexToolSourceTable(conf, SourceTable.DATA_TABLE_SOURCE);
+ sourceTable = PhoenixConfigurationUtil.getIndexToolSourceTable(conf);
+ Assert.assertEquals(sourceTable, SourceTable.DATA_TABLE_SOURCE);
+ }
}