You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sk...@apache.org on 2020/04/23 05:30:13 UTC
[phoenix] branch 4.x updated: PHOENIX-5804: Implement strong
verification with -v ONLY option for old design of secondry indexes (#758)
This is an automated email from the ASF dual-hosted git repository.
skadam pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push:
new b8ba647 PHOENIX-5804: Implement strong verification with -v ONLY option for old design of secondry indexes (#758)
b8ba647 is described below
commit b8ba647664c50cc6f677f463f928d3f9bc214766
Author: Swaroopa Kadam <sw...@gmail.com>
AuthorDate: Wed Apr 22 22:30:00 2020 -0700
PHOENIX-5804: Implement strong verification with -v ONLY option for old design of secondry indexes (#758)
Co-authored-by: s.kadam <s....@apache.org>
---
.../end2end/IndexToolForNonTxGlobalIndexIT.java | 483 +++++++++++++++++++++
.../org/apache/phoenix/end2end/IndexToolIT.java | 398 +----------------
.../end2end/IndexVerificationOldDesignIT.java | 161 +++++++
.../coprocessor/GlobalIndexRegionScanner.java | 203 +++++++++
.../coprocessor/IndexRebuildRegionScanner.java | 182 +-------
.../phoenix/coprocessor/IndexerRegionScanner.java | 354 +++++++++++++++
.../UngroupedAggregateRegionObserver.java | 23 +-
.../phoenix/hbase/index/IndexRegionObserver.java | 10 +-
.../index/PhoenixIndexToolJobCounters.java | 2 +-
.../phoenix/index/VerifySingleIndexRowTest.java | 3 +-
10 files changed, 1248 insertions(+), 571 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
new file mode 100644
index 0000000..e151040
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.util.IndexScrutiny;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.mapreduce.PhoenixJobCounters.INPUT_RECORDS;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_MISSING_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_VALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class IndexToolForNonTxGlobalIndexIT extends BaseUniqueNamesOwnClusterIT {
+
+ private final String tableDDLOptions;
+ private boolean directApi = true;
+ private boolean useSnapshot = false;
+ private boolean mutable;
+
+ public IndexToolForNonTxGlobalIndexIT(boolean mutable) {
+ StringBuilder optionBuilder = new StringBuilder();
+ this.mutable = mutable;
+ if (!mutable) {
+ optionBuilder.append(" IMMUTABLE_ROWS=true ");
+ }
+ optionBuilder.append(" SPLIT ON(1,2)");
+ this.tableDDLOptions = optionBuilder.toString();
+ }
+
+ @Parameterized.Parameters(name = "mutable={0}")
+ public static synchronized Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {true},
+ {false} });
+ }
+
+ @BeforeClass
+ public static synchronized void setup() throws Exception {
+ Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2);
+ serverProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20));
+ serverProps.put(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB, Long.toString(5));
+ serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
+ QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+ serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(8));
+ Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
+ clientProps.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true));
+ clientProps.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5));
+ clientProps.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
+ clientProps.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.TRUE.toString());
+ setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
+ new ReadOnlyProps(clientProps.entrySet().iterator()));
+ }
+
+ @Test
+ public void testWithSetNull() throws Exception {
+ // This tests the cases where a column having a null value is overwritten with a not null value and vice versa;
+ // and after that the index table is still rebuilt correctly
+ if(!this.mutable) {
+ return;
+ }
+ final int NROWS = 2 * 3 * 5 * 7;
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) "
+ + tableDDLOptions);
+ String upsertStmt = "UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)";
+ PreparedStatement stmt = conn.prepareStatement(upsertStmt);
+ IndexToolIT.setEveryNthRowWithNull(NROWS, 2, stmt);
+ conn.commit();
+ IndexToolIT.setEveryNthRowWithNull(NROWS, 3, stmt);
+ conn.commit();
+ conn.createStatement().execute(String.format(
+ "CREATE INDEX %s ON %s (VAL1) INCLUDE (VAL2) ASYNC ", indexTableName, dataTableFullName));
+ // Run the index MR job and verify that the index table is built correctly
+ IndexTool
+ indexTool = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, new String[0]);
+ assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+ long actualRowCount = IndexScrutiny
+ .scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+ assertEquals(NROWS, actualRowCount);
+ actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+ assertEquals(NROWS, actualRowCount);
+ IndexToolIT.setEveryNthRowWithNull(NROWS, 5, stmt);
+ conn.commit();
+ actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+ assertEquals(NROWS, actualRowCount);
+ IndexToolIT.setEveryNthRowWithNull(NROWS, 7, stmt);
+ conn.commit();
+ actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+ assertEquals(NROWS, actualRowCount);
+ actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+ assertEquals(NROWS, actualRowCount);
+ indexTool = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null,
+ 0, IndexTool.IndexVerifyType.ONLY, new String[0]);
+ assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+ assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
+ assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
+ IndexToolIT.dropIndexToolTables(conn);
+ }
+ }
+
+ @Test
+ public void testIndexToolVerifyWithExpiredIndexRows() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) COLUMN_ENCODED_BYTES=0");
+ // Insert a row
+ conn.createStatement()
+ .execute("upsert into " + dataTableFullName + " values (1, 'Phoenix', 'A')");
+ conn.commit();
+ conn.createStatement()
+ .execute(String.format("CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) ASYNC",
+ indexTableName, dataTableFullName));
+ IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0,
+ IndexTool.IndexVerifyType.ONLY);
+ Cell cell =
+ IndexToolIT.getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName,
+ indexTableFullName);
+ try {
+ String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW;
+ String actualErrorMsg = Bytes
+ .toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+ assertTrue(expectedErrorMsg.equals(actualErrorMsg));
+ } catch(Exception ex) {
+ Assert.fail("Fail to parsing the error message from IndexToolOutputTable");
+ }
+
+ // Run the index tool to populate the index while verifying rows
+ IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0,
+ IndexTool.IndexVerifyType.AFTER);
+
+ // Set ttl of index table ridiculously low so that all data is expired
+ Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+ TableName indexTable = TableName.valueOf(indexTableFullName);
+ HColumnDescriptor desc = admin.getTableDescriptor(indexTable).getColumnFamilies()[0];
+ desc.setTimeToLive(1);
+ admin.modifyColumn(indexTable, desc);
+ Thread.sleep(1000);
+ Pair<Integer, Integer> status = admin.getAlterStatus(indexTable);
+ int retry = 0;
+ while (retry < 20 && status.getFirst() != 0) {
+ Thread.sleep(2000);
+ status = admin.getAlterStatus(indexTable);
+ }
+ assertTrue(status.getFirst() == 0);
+
+ TableName indexToolOutputTable = TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES);
+ admin.disableTable(indexToolOutputTable);
+ admin.deleteTable(indexToolOutputTable);
+ // Run the index tool using the only-verify option, verify it gives no mismatch
+ IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0,
+ IndexTool.IndexVerifyType.ONLY);
+ Scan scan = new Scan();
+ Table hIndexToolTable =
+ conn.unwrap(PhoenixConnection.class).getQueryServices()
+ .getTable(indexToolOutputTable.getName());
+ Result r = hIndexToolTable.getScanner(scan).next();
+ assertTrue(r == null);
+ IndexToolIT.dropIndexToolTables(conn);
+ }
+ }
+
+ @Test
+ public void testSecondaryGlobalIndexFailure() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String stmString1 =
+ "CREATE TABLE " + dataTableFullName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
+ + tableDDLOptions;
+ conn.createStatement().execute(stmString1);
+ String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
+ PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+
+ // Insert two rows
+ IndexToolIT.upsertRow(stmt1, 1);
+ IndexToolIT.upsertRow(stmt1, 2);
+ conn.commit();
+
+ String stmtString2 =
+ String.format(
+ "CREATE INDEX %s ON %s (LPAD(UPPER(NAME, 'en_US'),8,'x')||'_xyz') ASYNC ", indexTableName, dataTableFullName);
+ conn.createStatement().execute(stmtString2);
+
+ // Run the index MR job.
+ IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName);
+
+ String qIndexTableName = SchemaUtil.getQualifiedTableName(schemaName, indexTableName);
+
+ // Verify that the index table is in the ACTIVE state
+ assertEquals(PIndexState.ACTIVE, TestUtil.getIndexState(conn, qIndexTableName));
+
+ ConnectionQueryServices queryServices = conn.unwrap(PhoenixConnection.class).getQueryServices();
+ Admin admin = queryServices.getAdmin();
+ TableName tableName = TableName.valueOf(qIndexTableName);
+ admin.disableTable(tableName);
+
+ // Run the index MR job and it should fail (return -1)
+ IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
+ null, -1, new String[0]);
+
+ // Verify that the index table should be still in the ACTIVE state
+ assertEquals(PIndexState.ACTIVE, TestUtil.getIndexState(conn, qIndexTableName));
+ }
+ }
+
+ @Test
+ public void testBuildSecondaryIndexAndScrutinize() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String stmString1 =
+ "CREATE TABLE " + dataTableFullName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
+ + tableDDLOptions;
+ conn.createStatement().execute(stmString1);
+ String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
+ PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+
+ // Insert NROWS rows
+ final int NROWS = 1000;
+ for (int i = 0; i < NROWS; i++) {
+ IndexToolIT.upsertRow(stmt1, i);
+ }
+ conn.commit();
+ String stmtString2 =
+ String.format(
+ "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC ", indexTableName, dataTableFullName);
+ conn.createStatement().execute(stmtString2);
+
+ // Run the index MR job and verify that the index table is built correctly
+ IndexTool indexTool = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.BEFORE, new String[0]);
+ assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+ assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
+ assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
+ long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+ assertEquals(NROWS, actualRowCount);
+
+ // Add more rows and make sure that these rows will be visible to IndexTool
+ for (int i = NROWS; i < 2 * NROWS; i++) {
+ IndexToolIT.upsertRow(stmt1, i);
+ }
+ conn.commit();
+ indexTool = IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.BOTH, new String[0]);
+ assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+ assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
+ assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
+ actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
+ assertEquals(2 * NROWS, actualRowCount);
+ IndexToolIT.dropIndexToolTables(conn);
+ }
+ }
+
+ @Test
+ public void testIndexToolVerifyBeforeAndBothOptions() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ String viewName = generateUniqueName();
+ String viewFullName = SchemaUtil.getTableName(schemaName, viewName);
+ conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
+ + tableDDLOptions);
+ conn.commit();
+ conn.createStatement().execute("CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName);
+ conn.commit();
+ // Insert a row
+ conn.createStatement().execute("upsert into " + viewFullName + " values (1, 'Phoenix', 12345)");
+ conn.commit();
+ conn.createStatement().execute(String.format(
+ "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC", indexTableName, viewFullName));
+ TestUtil.addCoprocessor(conn, "_IDX_" + dataTableFullName, IndexToolIT.MutationCountingRegionObserver.class);
+ // Run the index MR job and verify that the index table rebuild succeeds
+ IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
+ null, 0, IndexTool.IndexVerifyType.AFTER);
+ assertEquals(1, IndexToolIT.MutationCountingRegionObserver.getMutationCount());
+ IndexToolIT.MutationCountingRegionObserver.setMutationCount(0);
+ // Since all the rows are in the index table, running the index tool with the "-v BEFORE" option should not
+ // write any index rows
+ IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
+ null, 0, IndexTool.IndexVerifyType.BEFORE);
+ assertEquals(0, IndexToolIT.MutationCountingRegionObserver.getMutationCount());
+ // The "-v BOTH" option should not write any index rows either
+ IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
+ null, 0, IndexTool.IndexVerifyType.BOTH);
+ assertEquals(0, IndexToolIT.MutationCountingRegionObserver.getMutationCount());
+ IndexToolIT.dropIndexToolTables(conn);
+ }
+ }
+
+ @Test
+ public void testIndexToolVerifyAfterOption() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ String viewName = generateUniqueName();
+ String viewFullName = SchemaUtil.getTableName(schemaName, viewName);
+ conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
+ + tableDDLOptions);
+ conn.commit();
+ conn.createStatement().execute("CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName);
+ conn.commit();
+ // Insert a row
+ conn.createStatement().execute("upsert into " + viewFullName + " values (1, 'Phoenix', 12345)");
+ conn.commit();
+ // Configure IndexRegionObserver to fail the first write phase. This should not
+ // lead to any change on index and thus the index verify during index rebuild should fail
+ IndexRegionObserver.setIgnoreIndexRebuildForTesting(true);
+ conn.createStatement().execute(String.format(
+ "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC", indexTableName, viewFullName));
+ // Run the index MR job and verify that the index table rebuild fails
+ IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
+ null, -1, IndexTool.IndexVerifyType.AFTER);
+ // The index tool output table should report that there is a missing index row
+ Cell cell = IndexToolIT.getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, "_IDX_" + dataTableFullName);
+ try {
+ String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW;
+ String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+ assertTrue(expectedErrorMsg.equals(actualErrorMsg));
+ } catch(Exception ex){
+ Assert.fail("Fail to parsing the error message from IndexToolOutputTable");
+ }
+ IndexRegionObserver.setIgnoreIndexRebuildForTesting(false);
+ IndexToolIT.dropIndexToolTables(conn);
+ }
+ }
+
+ @Test
+ public void testIndexToolOnlyVerifyOption() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) COLUMN_ENCODED_BYTES=0");
+ // Insert a row
+ conn.createStatement().execute("upsert into " + dataTableFullName + " values (1, 'Phoenix', 'A')");
+ conn.commit();
+ conn.createStatement().execute(String.format(
+ "CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) ASYNC", indexTableName, dataTableFullName));
+ // Run the index MR job to only verify that each data table row has a corresponding index row
+ // IndexTool will go through each data table row and record the mismatches in the output table
+ // called PHOENIX_INDEX_TOOL
+ IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
+ null, 0, IndexTool.IndexVerifyType.ONLY);
+ Cell cell = IndexToolIT.getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName);
+ try {
+ String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW;
+ String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+ assertTrue(expectedErrorMsg.equals(actualErrorMsg));
+ } catch(Exception ex) {
+ Assert.fail("Fail to parsing the error message from IndexToolOutputTable");
+ }
+ // Delete the output table for the next test
+ IndexToolIT.dropIndexToolTables(conn);
+ // Run the index tool to populate the index while verifying rows
+ IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
+ null, 0, IndexTool.IndexVerifyType.AFTER);
+ IndexToolIT.runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
+ null, 0, IndexTool.IndexVerifyType.ONLY);
+ IndexToolIT.dropIndexToolTables(conn);
+ }
+ }
+}
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 02b253f..e5db314 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -189,7 +189,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
return TestUtil.filterTxParamData(list,0);
}
- private void setEveryNthRowWithNull(int nrows, int nthRowNull, PreparedStatement stmt) throws Exception {
+ protected static void setEveryNthRowWithNull(int nrows, int nthRowNull, PreparedStatement stmt) throws Exception {
for (int i = 1; i <= nrows; i++) {
stmt.setInt(1, i);
stmt.setInt(2, i + 1);
@@ -203,66 +203,6 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
}
@Test
- public void testWithSetNull() throws Exception {
- // This test is for building non-transactional mutable global indexes with direct api
- if (localIndex || transactional || !directApi || useSnapshot || useTenantId || !mutable) {
- return;
- }
- // This tests the cases where a column having a null value is overwritten with a not null value and vice versa;
- // and after that the index table is still rebuilt correctly
- final int NROWS = 2 * 3 * 5 * 7;
- String schemaName = generateUniqueName();
- String dataTableName = generateUniqueName();
- String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
- String indexTableName = generateUniqueName();
- String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- conn.createStatement().execute("CREATE TABLE " + dataTableFullName
- + " (ID INTEGER NOT NULL PRIMARY KEY, VAL1 INTEGER, VAL2 INTEGER) "
- + tableDDLOptions);
- String upsertStmt = "UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)";
- PreparedStatement stmt = conn.prepareStatement(upsertStmt);
- setEveryNthRowWithNull(NROWS, 2, stmt);
- conn.commit();
- setEveryNthRowWithNull(NROWS, 3, stmt);
- conn.commit();
- conn.createStatement().execute(String.format(
- "CREATE %s INDEX %s ON %s (VAL1) INCLUDE (VAL2) ASYNC ",
- (localIndex ? "LOCAL" : ""), indexTableName, dataTableFullName));
- // Run the index MR job and verify that the index table is built correctly
- IndexTool indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, new String[0]);
- assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
- long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
- assertEquals(NROWS, actualRowCount);
- actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
- assertEquals(NROWS, actualRowCount);
- setEveryNthRowWithNull(NROWS, 5, stmt);
- conn.commit();
- actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
- assertEquals(NROWS, actualRowCount);
- setEveryNthRowWithNull(NROWS, 7, stmt);
- conn.commit();
- actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
- assertEquals(NROWS, actualRowCount);
- actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
- assertEquals(NROWS, actualRowCount);
- indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null,
- 0, IndexTool.IndexVerifyType.ONLY, new String[0]);
- assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
- assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
- assertEquals(0, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
- assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
- assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
- assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
- assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
- assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
- assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
- dropIndexToolTables(conn);
- }
- }
-
- @Test
public void testSecondaryIndex() throws Exception {
String schemaName = generateUniqueName();
String dataTableName = generateUniqueName();
@@ -370,7 +310,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
}
}
- private void dropIndexToolTables(Connection conn) throws Exception {
+ protected static void dropIndexToolTables(Connection conn) throws Exception {
Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
TableName indexToolOutputTable =
TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES);
@@ -381,80 +321,6 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
admin.deleteTable(indexToolResultTable);
}
- @Test
- public void testBuildSecondaryIndexAndScrutinize() throws Exception {
- // This test is for building non-transactional global indexes with direct api
- if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
- return;
- }
- String schemaName = generateUniqueName();
- String dataTableName = generateUniqueName();
- String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
- String indexTableName = generateUniqueName();
- String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- String stmString1 =
- "CREATE TABLE " + dataTableFullName
- + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
- + tableDDLOptions;
- conn.createStatement().execute(stmString1);
- String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
- PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
-
- // Insert NROWS rows
- final int NROWS = 1000;
- for (int i = 0; i < NROWS; i++) {
- upsertRow(stmt1, i);
- }
- conn.commit();
- String stmtString2 =
- String.format(
- "CREATE %s INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC ",
- (localIndex ? "LOCAL" : ""), indexTableName, dataTableFullName);
- conn.createStatement().execute(stmtString2);
-
- // Run the index MR job and verify that the index table is built correctly
- IndexTool indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.BEFORE, new String[0]);
- assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
- assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
- assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
- assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
- assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
- assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
- assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
- assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
- assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
- long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
- assertEquals(NROWS, actualRowCount);
-
- // Add more rows and make sure that these rows will be visible to IndexTool
- for (int i = NROWS; i < 2 * NROWS; i++) {
- upsertRow(stmt1, i);
- }
- conn.commit();
- indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.BOTH, new String[0]);
- assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
- assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(SCANNED_DATA_ROW_COUNT).getValue());
- assertEquals(0, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
- assertEquals(2 * NROWS, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
- assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
- assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
- assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
- assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
- assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
- assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
- assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
- assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
- assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
- assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
- assertEquals(0, indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
- actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
- assertEquals(2 * NROWS, actualRowCount);
- dropIndexToolTables(conn);
- }
- }
-
public static class MutationCountingRegionObserver extends SimpleRegionObserver {
public static AtomicInteger mutationCount = new AtomicInteger(0);
@@ -473,7 +339,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
}
}
- private void verifyIndexTableRowKey(byte[] rowKey, String indexTableFullName) {
+ private static void verifyIndexTableRowKey(byte[] rowKey, String indexTableFullName) {
// The row key for the output table : timestamp | index table name | data row key
// The row key for the result table : timestamp | index table name | datable table region name |
// scan start row | scan stop row
@@ -489,7 +355,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
IndexVerificationResultRepository.ROW_KEY_SEPARATOR_BYTE[0]);
}
- private Cell getErrorMessageFromIndexToolOutputTable(Connection conn, String dataTableFullName, String indexTableFullName)
+ public static Cell getErrorMessageFromIndexToolOutputTable(Connection conn, String dataTableFullName, String indexTableFullName)
throws Exception {
byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName);
byte[] dataTableFullNameBytes = Bytes.toBytes(dataTableFullName);
@@ -536,209 +402,6 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
}
@Test
- public void testIndexToolVerifyBeforeAndBothOptions() throws Exception {
- // This test is for building non-transactional global indexes with direct api
- if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
- return;
- }
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- String schemaName = generateUniqueName();
- String dataTableName = generateUniqueName();
- String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
- String indexTableName = generateUniqueName();
- String viewName = generateUniqueName();
- String viewFullName = SchemaUtil.getTableName(schemaName, viewName);
- conn.createStatement().execute("CREATE TABLE " + dataTableFullName
- + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
- + tableDDLOptions);
- conn.commit();
- conn.createStatement().execute("CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName);
- conn.commit();
- // Insert a row
- conn.createStatement().execute("upsert into " + viewFullName + " values (1, 'Phoenix', 12345)");
- conn.commit();
- conn.createStatement().execute(String.format(
- "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC", indexTableName, viewFullName));
- TestUtil.addCoprocessor(conn, "_IDX_" + dataTableFullName, MutationCountingRegionObserver.class);
- // Run the index MR job and verify that the index table rebuild succeeds
- runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
- null, 0, IndexTool.IndexVerifyType.AFTER);
- assertEquals(1, MutationCountingRegionObserver.getMutationCount());
- MutationCountingRegionObserver.setMutationCount(0);
- // Since all the rows are in the index table, running the index tool with the "-v BEFORE" option should not
- // write any index rows
- runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
- null, 0, IndexTool.IndexVerifyType.BEFORE);
- assertEquals(0, MutationCountingRegionObserver.getMutationCount());
- // The "-v BOTH" option should not write any index rows either
- runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
- null, 0, IndexTool.IndexVerifyType.BOTH);
- assertEquals(0, MutationCountingRegionObserver.getMutationCount());
- dropIndexToolTables(conn);
- }
- }
-
- @Test
- public void testIndexToolVerifyAfterOption() throws Exception {
- // This test is for building non-transactional global indexes with direct api
- if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
- return;
- }
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- String schemaName = generateUniqueName();
- String dataTableName = generateUniqueName();
- String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
- String indexTableName = generateUniqueName();
- String viewName = generateUniqueName();
- String viewFullName = SchemaUtil.getTableName(schemaName, viewName);
- conn.createStatement().execute("CREATE TABLE " + dataTableFullName
- + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
- + tableDDLOptions);
- conn.commit();
- conn.createStatement().execute("CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName);
- conn.commit();
- // Insert a row
- conn.createStatement().execute("upsert into " + viewFullName + " values (1, 'Phoenix', 12345)");
- conn.commit();
- // Configure IndexRegionObserver to fail the first write phase. This should not
- // lead to any change on index and thus the index verify during index rebuild should fail
- IndexRegionObserver.setIgnoreIndexRebuildForTesting(true);
- conn.createStatement().execute(String.format(
- "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC", indexTableName, viewFullName));
- // Run the index MR job and verify that the index table rebuild fails
- runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
- null, -1, IndexTool.IndexVerifyType.AFTER);
- // The index tool output table should report that there is a missing index row
- Cell cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, "_IDX_" + dataTableFullName);
- try {
- String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK;
- String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
- assertTrue(expectedErrorMsg.equals(actualErrorMsg));
- } catch(Exception ex){
- Assert.fail("Fail to parsing the error message from IndexToolOutputTable");
- }
- IndexRegionObserver.setIgnoreIndexRebuildForTesting(false);
- dropIndexToolTables(conn);
- }
- }
-
- @Test
- public void testIndexToolOnlyVerifyOption() throws Exception {
- // This test is for building non-transactional global indexes with direct api
- if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
- return;
- }
- String schemaName = generateUniqueName();
- String dataTableName = generateUniqueName();
- String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
- String indexTableName = generateUniqueName();
- String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- conn.createStatement().execute("CREATE TABLE " + dataTableFullName
- + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) COLUMN_ENCODED_BYTES=0");
- // Insert a row
- conn.createStatement().execute("upsert into " + dataTableFullName + " values (1, 'Phoenix', 'A')");
- conn.commit();
- conn.createStatement().execute(String.format(
- "CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) ASYNC", indexTableName, dataTableFullName));
- // Run the index MR job to only verify that each data table row has a corresponding index row
- // IndexTool will go through each data table row and record the mismatches in the output table
- // called PHOENIX_INDEX_TOOL
- runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
- null, 0, IndexTool.IndexVerifyType.ONLY);
- Cell cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName);
- try {
- String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK;
- String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
- assertTrue(expectedErrorMsg.equals(actualErrorMsg));
- } catch(Exception ex) {
- Assert.fail("Fail to parsing the error message from IndexToolOutputTable");
- }
- // Delete the output table for the next test
- dropIndexToolTables(conn);
- // Run the index tool to populate the index while verifying rows
- runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
- null, 0, IndexTool.IndexVerifyType.AFTER);
- runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
- null, 0, IndexTool.IndexVerifyType.ONLY);
- dropIndexToolTables(conn);
- }
- }
-
- @Test
- public void testIndexToolVerifyWithExpiredIndexRows() throws Exception {
- // This test is for building non-transactional global indexes with direct api
- if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
- return;
- }
- String schemaName = generateUniqueName();
- String dataTableName = generateUniqueName();
- String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
- String indexTableName = generateUniqueName();
- String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- conn.createStatement().execute("CREATE TABLE " + dataTableFullName
- + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) COLUMN_ENCODED_BYTES=0");
- // Insert a row
- conn.createStatement()
- .execute("upsert into " + dataTableFullName + " values (1, 'Phoenix', 'A')");
- conn.commit();
- conn.createStatement()
- .execute(String.format("CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) ASYNC",
- indexTableName, dataTableFullName));
- runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0,
- IndexTool.IndexVerifyType.ONLY);
- Cell cell =
- getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName,
- indexTableFullName);
- try {
- String expectedErrorMsg = IndexRebuildRegionScanner.ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK;
- String actualErrorMsg = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
- assertTrue(expectedErrorMsg.equals(actualErrorMsg));
- } catch(Exception ex) {
- Assert.fail("Fail to parsing the error message from IndexToolOutputTable");
- }
-
- // Run the index tool to populate the index while verifying rows
- runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0,
- IndexTool.IndexVerifyType.AFTER);
-
- // Set ttl of index table ridiculously low so that all data is expired
- Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
- TableName indexTable = TableName.valueOf(indexTableFullName);
- HColumnDescriptor desc = admin.getTableDescriptor(indexTable).getColumnFamilies()[0];
- desc.setTimeToLive(1);
- admin.modifyColumn(indexTable, desc);
- Thread.sleep(1000);
- Pair<Integer, Integer> status = admin.getAlterStatus(indexTable);
- int retry = 0;
- while (retry < 20 && status.getFirst() != 0) {
- Thread.sleep(2000);
- status = admin.getAlterStatus(indexTable);
- }
- assertTrue(status.getFirst() == 0);
-
- TableName indexToolOutputTable = TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME_BYTES);
- admin.disableTable(indexToolOutputTable);
- admin.deleteTable(indexToolOutputTable);
- // Run the index tool using the only-verify option, verify it gives no mismatch
- runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0,
- IndexTool.IndexVerifyType.ONLY);
- Scan scan = new Scan();
- Table hIndexToolTable =
- conn.unwrap(PhoenixConnection.class).getQueryServices()
- .getTable(indexToolOutputTable.getName());
- Result r = hIndexToolTable.getScanner(scan).next();
- assertTrue(r == null);
- dropIndexToolTables(conn);
- }
- }
-
- @Test
public void testIndexToolWithTenantId() throws Exception {
if (!useTenantId) { return;}
String tenantId = generateUniqueName();
@@ -823,59 +486,6 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
}
@Test
- public void testSecondaryGlobalIndexFailure() throws Exception {
- // This test is for building non-transactional global indexes with direct api
- if (localIndex || transactional || !directApi || useSnapshot || useTenantId) {
- return;
- }
- String schemaName = generateUniqueName();
- String dataTableName = generateUniqueName();
- String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
- String indexTableName = generateUniqueName();
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- String stmString1 =
- "CREATE TABLE " + dataTableFullName
- + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
- + tableDDLOptions;
- conn.createStatement().execute(stmString1);
- String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", dataTableFullName);
- PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
-
- // Insert two rows
- upsertRow(stmt1, 1);
- upsertRow(stmt1, 2);
- conn.commit();
-
- String stmtString2 =
- String.format(
- "CREATE %s INDEX %s ON %s (LPAD(UPPER(NAME, 'en_US'),8,'x')||'_xyz') ASYNC ",
- (localIndex ? "LOCAL" : ""), indexTableName, dataTableFullName);
- conn.createStatement().execute(stmtString2);
-
- // Run the index MR job.
- runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName);
-
- String qIndexTableName = SchemaUtil.getQualifiedTableName(schemaName, indexTableName);
-
- // Verify that the index table is in the ACTIVE state
- assertEquals(PIndexState.ACTIVE, TestUtil.getIndexState(conn, qIndexTableName));
-
- ConnectionQueryServices queryServices = conn.unwrap(PhoenixConnection.class).getQueryServices();
- Admin admin = queryServices.getAdmin();
- TableName tableName = TableName.valueOf(qIndexTableName);
- admin.disableTable(tableName);
-
- // Run the index MR job and it should fail (return -1)
- runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
- null, -1, new String[0]);
-
- // Verify that the index table should be still in the ACTIVE state
- assertEquals(PIndexState.ACTIVE, TestUtil.getIndexState(conn, qIndexTableName));
- }
- }
-
- @Test
public void testSaltedVariableLengthPK() throws Exception {
if (!mutable) return;
if (transactional) return;
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexVerificationOldDesignIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexVerificationOldDesignIT.java
new file mode 100644
index 0000000..477f74f
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexVerificationOldDesignIT.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import com.google.common.collect.Maps;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ManualEnvironmentEdge;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT;
+import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+
+public class IndexVerificationOldDesignIT extends BaseUniqueNamesOwnClusterIT {
+
+ ManualEnvironmentEdge injectEdge;
+
+ @BeforeClass
+ public static synchronized void setup() throws Exception {
+ Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(2);
+ serverProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20));
+ serverProps.put(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB, Long.toString(5));
+ serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
+ QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+ serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(8));
+ Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
+ clientProps.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true));
+ clientProps.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5));
+ clientProps.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
+ clientProps.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.TRUE.toString());
+ clientProps.put(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB,
+ Boolean.toString(false));
+ setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
+ new ReadOnlyProps(clientProps.entrySet().iterator()));
+ }
+
+ @Test
+ public void testIndexToolOnlyVerifyOption() throws Exception {
+ long ttl=3600;
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) COLUMN_ENCODED_BYTES=0, TTL="+ttl);
+ conn.createStatement().execute(String.format(
+ "CREATE INDEX %s ON %s (NAME) INCLUDE (CODE)", indexTableName, dataTableFullName));
+
+ upsertValidRows(conn, dataTableFullName);
+
+ IndexTool indexTool = IndexToolIT.runIndexTool(true, false, schemaName, dataTableName, indexTableName,
+ null, 0, IndexTool.IndexVerifyType.ONLY);
+
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(6, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+
+ conn.createStatement().execute("upsert into " + indexTableFullName + " values ('Phoenix5', 6,'G')");
+ conn.commit();
+ indexTool = IndexToolIT.runIndexTool(true, false, schemaName, dataTableName, indexTableName,
+ null, 0, IndexTool.IndexVerifyType.ONLY);
+
+ assertEquals(1, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(5, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+
+ injectEdge = new ManualEnvironmentEdge();
+ injectEdge.setValue(EnvironmentEdgeManager.currentTimeMillis() + ttl*1000);
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+
+ indexTool = IndexToolIT.runIndexTool(true, false, schemaName, dataTableName, indexTableName,
+ null, 0, IndexTool.IndexVerifyType.ONLY);
+
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT).getValue());
+ }
+ }
+
+ @Test
+ public void testIndexToolOnlyVerifyOption_viewIndex() throws Exception {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ String viewName = generateUniqueName();
+ String fullViewName = SchemaUtil.getTableName(schemaName, viewName);
+ String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) COLUMN_ENCODED_BYTES=0");
+ conn.createStatement().execute("CREATE VIEW " + fullViewName
+ + " AS SELECT * FROM "+dataTableFullName);
+ conn.createStatement().execute(String.format(
+ "CREATE INDEX %s ON %s (NAME) INCLUDE (CODE)", indexTableName, fullViewName));
+
+ upsertValidRows(conn, fullViewName);
+
+ IndexToolIT.runIndexTool(true, false, schemaName, viewName, indexTableName,
+ null, 0, IndexTool.IndexVerifyType.ONLY);
+
+ conn.createStatement().execute("upsert into " + indexTableFullName + " values ('Phoenix5', 6,'G')");
+ conn.createStatement().execute("delete from " + indexTableFullName + " where \"0:CODE\" = 'D'");
+ conn.commit();
+
+ IndexTool indexTool = IndexToolIT.runIndexTool(true, false, schemaName, viewName, indexTableName,
+ null, 0, IndexTool.IndexVerifyType.ONLY);
+ assertEquals(1, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(4, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_VALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(1, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+ }
+ }
+
+ private void upsertValidRows(Connection conn, String table) throws SQLException {
+ conn.createStatement().execute("upsert into " + table + " values (1, 'Phoenix', 'A')");
+ conn.createStatement().execute("upsert into " + table + " values (2, 'Phoenix1', 'B')");
+ conn.createStatement().execute("upsert into " + table + " values (3, 'Phoenix2', 'C')");
+ conn.createStatement().execute("upsert into " + table + " values (4, 'Phoenix3', 'D')");
+ conn.createStatement().execute("upsert into " + table + " values (5, 'Phoenix4', 'E')");
+ conn.createStatement().execute("upsert into " + table + " values (6, 'Phoenix5', 'F')");
+ conn.commit();
+ }
+
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
new file mode 100644
index 0000000..35a0a8a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.parallel.TaskBatch;
+import org.apache.phoenix.hbase.index.parallel.TaskRunner;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
+import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner;
+import org.apache.phoenix.hbase.index.table.HTableFactory;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.ServerUtil;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
+import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
+
+public abstract class GlobalIndexRegionScanner extends BaseRegionScanner {
+
+ public static final String NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY = "index.verify.threads.max";
+ public static final int DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS = 17;
+ public static final String INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY = "index.verify.threads.max";
+ public static final int DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK = 2048;
+ public static final String NO_EXPECTED_MUTATION = "No expected mutation";
+ public static final String ACTUAL_MUTATION_IS_NULL_OR_EMPTY = "actualMutationList is null or empty";
+ public static final String ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK = "Missing index row beyond maxLookBack";
+ public static final String ERROR_MESSAGE_MISSING_INDEX_ROW = "Missing index row";
+
+ protected long pageSizeInRows = Long.MAX_VALUE;
+ protected int rowCountPerTask;
+ protected boolean hasMore;
+ protected int maxBatchSize;
+ protected byte[] indexMetaData;
+ protected Scan scan;
+ protected RegionScanner innerScanner;
+ protected Region region;
+ protected IndexMaintainer indexMaintainer;
+ protected Table indexHTable;
+ protected TaskRunner pool;
+ protected TaskBatch<Boolean> tasks;
+ protected String exceptionMessage;
+ protected HTableFactory hTableFactory;
+ protected int indexTableTTL;
+ protected long maxLookBackInMills;
+ protected IndexToolVerificationResult verificationResult;
+ protected IndexVerificationResultRepository verificationResultRepository;
+
+ public GlobalIndexRegionScanner(RegionScanner innerScanner, final Region region, final Scan scan,
+ final RegionCoprocessorEnvironment env) throws IOException {
+ super(innerScanner);
+ final Configuration config = env.getConfiguration();
+ if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) != null) {
+ pageSizeInRows = config.getLong(INDEX_REBUILD_PAGE_SIZE_IN_ROWS,
+ QueryServicesOptions.DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS);
+ }
+ maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+ indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
+ if (indexMetaData == null) {
+ indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+ }
+ List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(indexMetaData, true);
+ indexMaintainer = maintainers.get(0);
+ this.scan = scan;
+ this.innerScanner = innerScanner;
+ this.region = region;
+ verificationResult = new IndexToolVerificationResult(scan);
+ // Create the following objects only for rebuilds by IndexTool
+ hTableFactory = ServerUtil.getDelegateHTableFactory(env, ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION);
+ indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+ indexTableTTL = indexHTable.getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
+ maxLookBackInMills = ScanInfoUtil.getMaxLookbackInMillis(config);
+ verificationResultRepository =
+ new IndexVerificationResultRepository(indexMaintainer.getIndexTableName(), hTableFactory);
+ pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor(
+ new ThreadPoolBuilder("IndexVerify",
+ env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY,
+ DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS).setCoreTimeout(
+ INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env));
+ rowCountPerTask = config.getInt(INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY,
+ DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK);
+ }
+
+ public static class SimpleValueGetter implements ValueGetter {
+ final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
+ final Put put;
+ public SimpleValueGetter (final Put put) {
+ this.put = put;
+ }
+ @Override
+ public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) {
+ List<Cell> cellList = put.get(ref.getFamily(), ref.getQualifier());
+ if (cellList == null || cellList.isEmpty()) {
+ return null;
+ }
+ Cell cell = cellList.get(0);
+ valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+ return valuePtr;
+ }
+
+ @Override
+ public byte[] getRowKey() {
+ return put.getRow();
+ }
+
+ }
+
+ public static byte[] getIndexRowKey(IndexMaintainer indexMaintainer, final Put dataRow) {
+ ValueGetter valueGetter = new SimpleValueGetter(dataRow);
+ return indexMaintainer.buildRowKey(valueGetter, new ImmutableBytesWritable(dataRow.getRow()),
+ null, null, HConstants.LATEST_TIMESTAMP);
+ }
+
+ public static long getMaxTimestamp(Mutation m) {
+ long ts = 0;
+ for (List<Cell> cells : m.getFamilyCellMap().values()) {
+ if (cells == null) {
+ continue;
+ }
+ for (Cell cell : cells) {
+ if (ts < cell.getTimestamp()) {
+ ts = cell.getTimestamp();
+ }
+ }
+ }
+ return ts;
+ }
+
+ public static long getTimestamp(Mutation m) {
+ for (List<Cell> cells : m.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
+ return cell.getTimestamp();
+ }
+ }
+ throw new IllegalStateException("No cell found");
+ }
+
+ protected static boolean isTimestampBeforeTTL(long tableTTL, long currentTime, long tsToCheck) {
+ if (tableTTL == HConstants.FOREVER) {
+ return false;
+ }
+ return tsToCheck < (currentTime - tableTTL * 1000);
+ }
+
+ protected static boolean isTimestampBeyondMaxLookBack(long maxLookBackInMills,
+ long currentTime, long tsToCheck) {
+ if (!ScanInfoUtil.isMaxLookbackTimeEnabled(maxLookBackInMills)) {
+ return false;
+ }
+ return tsToCheck < (currentTime - maxLookBackInMills);
+ }
+
+ protected static long getMaxTimestamp(Pair<Put, Delete> pair) {
+ Put put = pair.getFirst();
+ long ts1 = 0;
+ if (put != null) {
+ ts1 = getMaxTimestamp(put);
+ }
+ Delete del = pair.getSecond();
+ long ts2 = 0;
+ if (del != null) {
+ ts1 = getMaxTimestamp(del);
+ }
+ return (ts1 > ts2) ? ts1 : ts2;
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 8b7e3f2..e767a23 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -24,8 +24,6 @@ import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
-import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS;
-import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
import java.io.IOException;
@@ -54,12 +52,10 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.hbase.regionserver.ScanInfoUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
@@ -67,22 +63,18 @@ import org.apache.phoenix.cache.ServerCacheClient;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.hbase.index.ValueGetter;
-import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
import org.apache.phoenix.hbase.index.parallel.Task;
import org.apache.phoenix.hbase.index.parallel.TaskBatch;
-import org.apache.phoenix.hbase.index.parallel.TaskRunner;
import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner;
-import org.apache.phoenix.hbase.index.table.HTableFactory;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.index.GlobalIndexChecker;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
-import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.query.KeyRange;
@@ -98,86 +90,50 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
-public class IndexRebuildRegionScanner extends BaseRegionScanner {
- public static final String ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK = "Missing index row beyond maxLookBack";
+public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner {
private static final Logger LOGGER = LoggerFactory.getLogger(IndexRebuildRegionScanner.class);
- public static final String NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY = "index.verify.threads.max";
- private static final int DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS = 17;
- public static final String INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY = "index.verify.threads.max";
- private static final int DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK = 2048;
- public static final String NO_EXPECTED_MUTATION = "No expected mutation";
- public static final String
- ACTUAL_MUTATION_IS_NULL_OR_EMPTY = "actualMutationList is null or empty";
- private long pageSizeInRows = Long.MAX_VALUE;
- private int rowCountPerTask;
- private boolean hasMore;
- private final int maxBatchSize;
- private UngroupedAggregateRegionObserver.MutationList mutations;
+
private final long maxBatchSizeBytes;
private final long blockingMemstoreSize;
private final byte[] clientVersionBytes;
- private byte[] indexMetaData;
private boolean useProto = true;
- private Scan scan;
- private RegionScanner innerScanner;
- private Region region;
- private IndexMaintainer indexMaintainer;
- private byte[] indexRowKey = null;
- private Table indexHTable = null;
+ private byte[] indexRowKey;
private IndexTool.IndexVerifyType verifyType = IndexTool.IndexVerifyType.NONE;
private boolean verify = false;
private Map<byte[], List<Mutation>> indexKeyToMutationMap;
private Map<byte[], Pair<Put, Delete>> dataKeyToMutationMap;
- private TaskRunner pool;
- private TaskBatch<Boolean> tasks;
- private String exceptionMessage;
private UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver;
- private HTableFactory hTableFactory;
- private int indexTableTTL = 0;
- private IndexToolVerificationResult verificationResult;
+ protected UngroupedAggregateRegionObserver.MutationList mutations;
private boolean isBeforeRebuilt = true;
private boolean partialRebuild = false;
- private int singleRowRebuildReturnCode;
+ private int singleRowRebuildReturnCode;
private Map<byte[], NavigableSet<byte[]>> familyMap;
private byte[][] viewConstants;
- private IndexVerificationResultRepository verificationResultRepository;
private IndexVerificationOutputRepository verificationOutputRepository;
- private long maxLookBackInMills;
@VisibleForTesting
public IndexRebuildRegionScanner(final RegionScanner innerScanner, final Region region, final Scan scan,
final RegionCoprocessorEnvironment env,
UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver) throws IOException {
- super(innerScanner);
+ super(innerScanner, region, scan, env);
final Configuration config = env.getConfiguration();
- if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) != null) {
- pageSizeInRows = config.getLong(INDEX_REBUILD_PAGE_SIZE_IN_ROWS,
- QueryServicesOptions.DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS);
- } else {
+ if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) == null) {
partialRebuild = true;
}
- maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
- mutations = new UngroupedAggregateRegionObserver.MutationList(maxBatchSize);
maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
+ mutations = new UngroupedAggregateRegionObserver.MutationList(maxBatchSize);
blockingMemstoreSize = UngroupedAggregateRegionObserver.getBlockingMemstoreSize(region, config);
clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
if (indexMetaData == null) {
useProto = false;
- indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
}
- List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(indexMetaData, true);
- indexMaintainer = maintainers.get(0);
- this.scan = scan;
familyMap = scan.getFamilyMap();
if (familyMap.isEmpty()) {
familyMap = null;
}
-
- this.innerScanner = innerScanner;
- this.region = region;
this.ungroupedAggregateRegionObserver = ungroupedAggregateRegionObserver;
indexRowKey = scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY);
if (indexRowKey != null) {
@@ -186,17 +142,10 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
}
byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE);
if (valueBytes != null) {
- verificationResult = new IndexToolVerificationResult(scan);
verifyType = IndexTool.IndexVerifyType.fromValue(valueBytes);
if (verifyType != IndexTool.IndexVerifyType.NONE) {
verify = true;
viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
- // Create the following objects only for rebuilds by IndexTool
- hTableFactory = ServerUtil.getDelegateHTableFactory(env, ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION);
- indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
- indexTableTTL = indexHTable.getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
- verificationResultRepository =
- new IndexVerificationResultRepository(indexMaintainer.getIndexTableName(), hTableFactory);
verificationOutputRepository =
new IndexVerificationOutputRepository(indexMaintainer.getIndexTableName(), hTableFactory);
indexKeyToMutationMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
@@ -206,12 +155,8 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY,
DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS).setCoreTimeout(
INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env));
- rowCountPerTask = config.getInt(INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY,
- DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK);
}
}
-
- maxLookBackInMills = ScanInfoUtil.getMaxLookbackInMillis(config);
}
private void setReturnCodeForSingleRowRebuild() throws IOException {
@@ -310,41 +255,9 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
return 0;
}
- public static class SimpleValueGetter implements ValueGetter {
- final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
- final Put put;
-
- public SimpleValueGetter(final Put put) {
- this.put = put;
- }
-
- @Override
- public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
- List<Cell> cellList = put.get(ref.getFamily(), ref.getQualifier());
- if (cellList == null || cellList.isEmpty()) {
- return null;
- }
- Cell cell = cellList.get(0);
- valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
- return valuePtr;
- }
-
- @Override
- public byte[] getRowKey() {
- return put.getRow();
- }
-
- }
-
- public byte[] getIndexRowKey(final Put dataRow) throws IOException {
- ValueGetter valueGetter = new SimpleValueGetter(dataRow);
- byte[] builtIndexRowKey = indexMaintainer.buildRowKey(valueGetter, new ImmutableBytesWritable(dataRow.getRow()),
- null, null, HConstants.LATEST_TIMESTAMP);
- return builtIndexRowKey;
- }
private boolean checkIndexRow(final byte[] indexRowKey, final Put put) throws IOException {
- byte[] builtIndexRowKey = getIndexRowKey(put);
+ byte[] builtIndexRowKey = getIndexRowKey(indexMaintainer, put);
if (Bytes.compareTo(builtIndexRowKey, 0, builtIndexRowKey.length,
indexRowKey, 0, indexRowKey.length) != 0) {
return false;
@@ -366,21 +279,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
region.getRegionInfo().getTable().getName(), isBeforeRebuilt);
}
- private static long getMaxTimestamp(Mutation m) {
- long ts = 0;
- for (List<Cell> cells : m.getFamilyCellMap().values()) {
- if (cells == null) {
- continue;
- }
- for (Cell cell : cells) {
- if (ts < cell.getTimestamp()) {
- ts = cell.getTimestamp();
- }
- }
- }
- return ts;
- }
-
private static Cell getCell(Mutation m, byte[] family, byte[] qualifier) {
List<Cell> cellList = m.getFamilyCellMap().get(family);
if (cellList == null) {
@@ -529,17 +427,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
return false;
}
- private boolean isDeleteFamilyVersion(Mutation mutation) {
- for (List<Cell> cells : mutation.getFamilyCellMap().values()) {
- for (Cell cell : cells) {
- if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.DeleteFamilyVersion) {
- return true;
- }
- }
- }
- return false;
- }
-
@VisibleForTesting
public List<Mutation> prepareActualIndexMutations(Result indexRow) throws IOException {
Put put = null;
@@ -724,7 +611,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
// get a value back from index if it has already expired between our rebuild and
// verify
// TODO: have a metric to update for these cases
- if (isTimestampBeforeTTL(currentTime, getTimestamp(expected))) {
+ if (isTimestampBeforeTTL(indexTableTTL, currentTime, getTimestamp(expected))) {
verificationPhaseResult.setExpiredIndexRowCount(verificationPhaseResult.getExpiredIndexRowCount() + 1);
return true;
}
@@ -782,7 +669,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
return true;
}
- if (isTimestampBeyondMaxLookBack(currentTime, getTimestamp(expectedMutationList.get(expectedIndex)))){
+ if (isTimestampBeyondMaxLookBack(maxLookBackInMills, currentTime, getTimestamp(expectedMutationList.get(expectedIndex)))){
if (expectedIndex > 0) {
// if current expected index mutation is beyond max look back window, we only need to make sure its latest
// mutation is a matching one, as an SCN query is required.
@@ -822,20 +709,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
}
}
- private static long getMaxTimestamp(Pair<Put, Delete> pair) {
- Put put = pair.getFirst();
- long ts1 = 0;
- if (put != null) {
- ts1 = getMaxTimestamp(put);
- }
- Delete del = pair.getSecond();
- long ts2 = 0;
- if (del != null) {
- ts1 = getMaxTimestamp(del);
- }
- return (ts1 > ts2) ? ts1 : ts2;
- }
-
private void verifyIndexRows(List<KeyRange> keys,
IndexToolVerificationResult.PhaseResult verificationPhaseResult) throws IOException {
List<KeyRange> invalidKeys = new ArrayList<>();
@@ -847,6 +720,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
indexScan.setFilter(new SkipScanFilter(skipScanFilter, true));
indexScan.setRaw(true);
indexScan.setMaxVersions();
+ indexScan.setCacheBlocks(false);
try (ResultScanner resultScanner = indexHTable.getScanner(indexScan)) {
for (Result result = resultScanner.next(); (result != null); result = resultScanner.next()) {
KeyRange keyRange = PVarbinary.INSTANCE.getKeyRange(result.getRow());
@@ -870,7 +744,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
KeyRange keyRange = itr.next();
byte[] key = keyRange.getLowerRange();
List<Mutation> mutationList = indexKeyToMutationMap.get(key);
- if (isTimestampBeforeTTL(currentTime, getTimestamp(mutationList.get(mutationList.size() - 1)))) {
+ if (isTimestampBeforeTTL(indexTableTTL, currentTime, getTimestamp(mutationList.get(mutationList.size() - 1)))) {
itr.remove();
verificationPhaseResult.setExpiredIndexRowCount(verificationPhaseResult.getExpiredIndexRowCount() + 1);
}
@@ -886,13 +760,13 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
}
long currentTime = EnvironmentEdgeManager.currentTimeMillis();
String errorMsg;
- if (isTimestampBeyondMaxLookBack(currentTime, getTimestamp(mutation))){
+ if (isTimestampBeyondMaxLookBack(maxLookBackInMills, currentTime, getTimestamp(mutation))){
errorMsg = ERROR_MESSAGE_MISSING_INDEX_ROW_BEYOND_MAX_LOOKBACK;
verificationPhaseResult.
setBeyondMaxLookBackMissingIndexRowCount(verificationPhaseResult.getBeyondMaxLookBackMissingIndexRowCount() + 1);
}
else {
- errorMsg = "Missing index row";
+ errorMsg = ERROR_MESSAGE_MISSING_INDEX_ROW;
verificationPhaseResult.setMissingIndexRowCount(verificationPhaseResult.getMissingIndexRowCount() + 1);
}
byte[] dataKey = indexMaintainer.buildDataRowKey(new ImmutableBytesWritable(keyRange.getLowerRange()), viewConstants);
@@ -906,19 +780,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
keys.addAll(invalidKeys);
}
- private boolean isTimestampBeforeTTL(long currentTime, long tsToCheck) {
- if (indexTableTTL == HConstants.FOREVER) {
- return false;
- }
- return tsToCheck < (currentTime - (long) indexTableTTL * 1000);
- }
-
- private boolean isTimestampBeyondMaxLookBack(long currentTime, long tsToCheck){
- if (!ScanInfoUtil.isMaxLookbackTimeEnabled(maxLookBackInMills))
- return true;
- return tsToCheck < (currentTime - maxLookBackInMills);
- }
-
private void addVerifyTask(final List<KeyRange> keys,
final IndexToolVerificationResult.PhaseResult verificationPhaseResult) {
tasks.add(new Task<Boolean>() {
@@ -1081,15 +942,6 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
return set.contains(qualifier);
}
- public static long getTimestamp(Mutation m) {
- for (List<Cell> cells : m.getFamilyCellMap().values()) {
- for (Cell cell : cells) {
- return cell.getTimestamp();
- }
- }
- throw new IllegalStateException("No cell found");
- }
-
/**
* This is to reorder the mutations in ascending order by the tuple of timestamp and mutation type where
* put comes before delete
@@ -1248,7 +1100,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
if (mutation.getFamilyCellMap().size() != 0) {
// Add this put on top of the current data row state to get the next data row state
Put nextDataRow = (currentDataRowState == null) ? new Put((Put)mutation) : applyNew((Put)mutation, currentDataRowState);
- ValueGetter nextDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(nextDataRow);
+ ValueGetter nextDataRowVG = new SimpleValueGetter(nextDataRow);
Put indexPut = prepareIndexPutForRebuid(indexMaintainer, rowKeyPtr, nextDataRowVG, ts);
indexMutations.add(indexPut);
// Delete the current index row if the new index key is different than the current one
@@ -1288,7 +1140,7 @@ public class IndexRebuildRegionScanner extends BaseRegionScanner {
currentDataRowState = null;
indexRowKeyForCurrentDataRow = null;
} else {
- ValueGetter nextDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(nextDataRowState);
+ ValueGetter nextDataRowVG = new SimpleValueGetter(nextDataRowState);
Put indexPut = prepareIndexPutForRebuid(indexMaintainer, rowKeyPtr, nextDataRowVG, ts);
indexMutations.add(indexPut);
// Delete the current index row if the new index key is different than the current one
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
new file mode 100644
index 0000000..88bac86
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.EMPTY_COLUMN_VALUE_BYTES;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
+import org.apache.phoenix.hbase.index.parallel.Task;
+import org.apache.phoenix.hbase.index.parallel.TaskBatch;
+import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
+
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+
+public class IndexerRegionScanner extends GlobalIndexRegionScanner {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(IndexerRegionScanner.class);
+ protected Map<byte[], Put> indexKeyToDataPutMap;
+
+ IndexerRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan,
+ final RegionCoprocessorEnvironment env) throws IOException {
+ super(innerScanner, region, scan, env);
+ indexKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ }
+
+ @Override
+ public HRegionInfo getRegionInfo() {
+ return region.getRegionInfo();
+ }
+
+ @Override
+ public boolean isFilterDone() { return false; }
+
+ @Override
+ public void close() throws IOException {
+ innerScanner.close();
+ try {
+ verificationResultRepository.logToIndexToolResultTable(verificationResult,
+ IndexTool.IndexVerifyType.ONLY, region.getRegionInfo().getRegionName());
+ } finally {
+ this.pool.stop("IndexerRegionScanner is closing");
+ hTableFactory.shutdown();
+ indexHTable.close();
+ verificationResultRepository.close();
+ }
+ }
+
+ private boolean verifySingleIndexRow(Result indexRow, final Put dataRow,
+ IndexToolVerificationResult.PhaseResult verificationPhaseResult) throws IOException {
+ ValueGetter valueGetter = new SimpleValueGetter(dataRow);
+ long ts = getMaxTimestamp(dataRow);
+ Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+ valueGetter, new ImmutableBytesWritable(dataRow.getRow()), ts, null, null);
+
+ if (indexPut == null) {
+ // This means the data row does not have any covered column values
+ indexPut = new Put(indexRow.getRow());
+ }
+ // Add the empty column
+ indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ indexMaintainer.getEmptyKeyValueQualifier(), ts, EMPTY_COLUMN_VALUE_BYTES);
+
+ int cellCount = 0;
+ long currentTime = EnvironmentEdgeManager.currentTime();
+ for (List<Cell> cells : indexPut.getFamilyCellMap().values()) {
+ if (cells == null) {
+ break;
+ }
+ for (Cell expectedCell : cells) {
+ byte[] family = CellUtil.cloneFamily(expectedCell);
+ byte[] qualifier = CellUtil.cloneQualifier(expectedCell);
+ Cell actualCell = indexRow.getColumnLatestCell(family, qualifier);
+ if (actualCell == null) {
+ // Check if cell expired as per the current server's time and data table ttl
+ // Index table should have the same ttl as the data table, hence we might not
+ // get a value back from index if it has already expired between our rebuild and
+ // verify
+
+ // or if cell timestamp is beyond maxlookback
+ if (isTimestampBeforeTTL(indexTableTTL, currentTime, expectedCell.getTimestamp())) {
+ continue;
+ }
+
+ return false;
+ }
+ if (actualCell.getTimestamp() < ts) {
+ // Skip older cells since a Phoenix index row is composed of cells with the same timestamp
+ continue;
+ }
+ // Check all columns
+ if (!CellUtil.matchingValue(actualCell, expectedCell) || actualCell.getTimestamp() != ts) {
+ if(isTimestampBeyondMaxLookBack(maxLookBackInMills, currentTime, actualCell.getTimestamp())) {
+ verificationPhaseResult
+ .setBeyondMaxLookBackInvalidIndexRowCount(verificationPhaseResult
+ .getBeyondMaxLookBackInvalidIndexRowCount()+1);
+ continue;
+ }
+ return false;
+ }
+ cellCount++;
+ }
+ }
+ return cellCount == indexRow.rawCells().length;
+ }
+
+ private void verifyIndexRows(List<KeyRange> keys, Map<byte[], Put> perTaskDataKeyToDataPutMap,
+ IndexToolVerificationResult.PhaseResult verificationPhaseResult) throws IOException {
+ ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
+ Scan indexScan = new Scan();
+ indexScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax());
+ scanRanges.initializeScan(indexScan);
+ SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
+ indexScan.setFilter(skipScanFilter);
+ indexScan.setCacheBlocks(false);
+ try (ResultScanner resultScanner = indexHTable.getScanner(indexScan)) {
+ for (Result result = resultScanner.next(); (result != null); result = resultScanner.next()) {
+ Put dataPut = indexKeyToDataPutMap.get(result.getRow());
+ if (dataPut == null) {
+ // This should never happen
+ exceptionMessage = "Index verify failed - Missing data row - " + indexHTable.getName();
+ throw new IOException(exceptionMessage);
+ }
+ if (verifySingleIndexRow(result, dataPut, verificationPhaseResult)) {
+ verificationPhaseResult.setValidIndexRowCount(verificationPhaseResult.getValidIndexRowCount()+1);
+ } else {
+ verificationPhaseResult.setInvalidIndexRowCount(verificationPhaseResult.getInvalidIndexRowCount()+1);
+ }
+ perTaskDataKeyToDataPutMap.remove(dataPut.getRow());
+ }
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(indexHTable.getName().toString(), t);
+ }
+ // Check if any expected rows from index(which we didn't get) are already expired due to TTL
+ if (!perTaskDataKeyToDataPutMap.isEmpty()) {
+ Iterator<Entry<byte[], Put>> itr = perTaskDataKeyToDataPutMap.entrySet().iterator();
+ long currentTime = EnvironmentEdgeManager.currentTime();
+ while(itr.hasNext()) {
+ Entry<byte[], Put> entry = itr.next();
+ long ts = getMaxTimestamp(entry.getValue());
+ if (isTimestampBeforeTTL(indexTableTTL, currentTime, ts)) {
+ itr.remove();
+ verificationPhaseResult.setExpiredIndexRowCount(verificationPhaseResult.getExpiredIndexRowCount()+1);
+ }
+ }
+ }
+ // Check if any expected rows from index(which we didn't get) are beyond max look back and have been compacted away
+ if (!perTaskDataKeyToDataPutMap.isEmpty()) {
+ for (Entry<byte[], Put> entry : perTaskDataKeyToDataPutMap.entrySet()) {
+ Put put = entry.getValue();
+ long ts = getMaxTimestamp(put);
+ long currentTime = EnvironmentEdgeManager.currentTime();
+ if (isTimestampBeyondMaxLookBack(maxLookBackInMills, currentTime, ts)) {
+ verificationPhaseResult.
+ setBeyondMaxLookBackMissingIndexRowCount(verificationPhaseResult.getBeyondMaxLookBackMissingIndexRowCount() + 1);
+ } else {
+ verificationPhaseResult.setMissingIndexRowCount(
+ verificationPhaseResult.getMissingIndexRowCount() + 1);
+ }
+ }
+ }
+ }
+
+ private void addVerifyTask(final List<KeyRange> keys, final Map<byte[], Put> perTaskDataKeyToDataPutMap,
+ final IndexToolVerificationResult.PhaseResult verificationPhaseResult) {
+ tasks.add(new Task<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ try {
+ if (Thread.currentThread().isInterrupted()) {
+ exceptionMessage = "Pool closed, not attempting to verify index rows! " + indexHTable.getName();
+ throw new IOException(exceptionMessage);
+ }
+ verifyIndexRows(keys, perTaskDataKeyToDataPutMap, verificationPhaseResult);
+ } catch (Exception e) {
+ throw e;
+ }
+ return Boolean.TRUE;
+ }
+ });
+ }
+
+ private void parallelizeIndexVerify(IndexToolVerificationResult.PhaseResult verificationPhaseResult) throws IOException {
+ int taskCount = (indexKeyToDataPutMap.size() + rowCountPerTask - 1) / rowCountPerTask;
+ tasks = new TaskBatch<>(taskCount);
+
+ List<Map<byte[], Put>> dataPutMapList = new ArrayList<>(taskCount);
+ List<IndexToolVerificationResult.PhaseResult> verificationPhaseResultList = new ArrayList<>(taskCount);
+ List<KeyRange> keys = new ArrayList<>(rowCountPerTask);
+
+ Map<byte[], Put> perTaskDataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+
+ dataPutMapList.add(perTaskDataKeyToDataPutMap);
+
+ IndexToolVerificationResult.PhaseResult perTaskVerificationPhaseResult = new IndexToolVerificationResult.PhaseResult();
+ verificationPhaseResultList.add(perTaskVerificationPhaseResult);
+
+ for (Map.Entry<byte[], Put> entry: indexKeyToDataPutMap.entrySet()) {
+ keys.add(PVarbinary.INSTANCE.getKeyRange(entry.getKey()));
+ perTaskDataKeyToDataPutMap.put(entry.getValue().getRow(), entry.getValue());
+ if (keys.size() == rowCountPerTask) {
+ addVerifyTask(keys, perTaskDataKeyToDataPutMap, perTaskVerificationPhaseResult);
+ keys = new ArrayList<>(rowCountPerTask);
+ perTaskDataKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ dataPutMapList.add(perTaskDataKeyToDataPutMap);
+ perTaskVerificationPhaseResult = new IndexToolVerificationResult.PhaseResult();
+ verificationPhaseResultList.add(perTaskVerificationPhaseResult);
+ }
+ }
+ if (keys.size() > 0) {
+ addVerifyTask(keys, perTaskDataKeyToDataPutMap, perTaskVerificationPhaseResult);
+ }
+ List<Boolean> taskResultList = null;
+ try {
+ LOGGER.debug("Waiting on index verify tasks to complete...");
+ taskResultList = this.pool.submitUninterruptible(tasks);
+ } catch (ExecutionException e) {
+ throw new RuntimeException("Should not fail on the results while using a WaitForCompletionTaskRunner", e);
+ } catch (EarlyExitFailure e) {
+ throw new RuntimeException("Stopped while waiting for batch, quitting!", e);
+ }
+ for (Boolean result : taskResultList) {
+ if (result == null) {
+ // there was a failure
+ throw new IOException(exceptionMessage);
+ }
+ }
+ for (IndexToolVerificationResult.PhaseResult result : verificationPhaseResultList) {
+ verificationPhaseResult.add(result);
+ }
+ }
+
+ private void verifyIndex() throws IOException {
+ IndexToolVerificationResult nextVerificationResult = new IndexToolVerificationResult(scan);
+ nextVerificationResult.setScannedDataRowCount(indexKeyToDataPutMap.size());
+ IndexToolVerificationResult.PhaseResult verificationPhaseResult = new IndexToolVerificationResult.PhaseResult();
+ // For these options we start with verifying index rows
+ parallelizeIndexVerify(verificationPhaseResult);
+ nextVerificationResult.getBefore().add(verificationPhaseResult);
+ indexKeyToDataPutMap.clear();
+ verificationResult.add(nextVerificationResult);
+ }
+
+ @Override
+ public boolean next(List<Cell> results) throws IOException {
+ Cell lastCell = null;
+ int rowCount = 0;
+ region.startRegionOperation();
+ try {
+ synchronized (innerScanner) {
+ do {
+ List<Cell> row = new ArrayList<>();
+ hasMore = innerScanner.nextRaw(row);
+ if (!row.isEmpty()) {
+ lastCell = row.get(0);
+ Put put = null;
+ for (Cell cell : row) {
+ if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+ if (put == null) {
+ put = new Put(CellUtil.cloneRow(cell));
+ }
+ put.add(cell);
+ } else {
+ throw new DoNotRetryIOException("Scan without raw found a deleted cell");
+ }
+ }
+ rowCount++;
+ indexKeyToDataPutMap
+ .put(getIndexRowKey(indexMaintainer, put), put);
+ }
+ } while (hasMore && rowCount < pageSizeInRows);
+ verifyIndex();
+ }
+ } catch (IOException e) {
+ LOGGER.error(String.format("IOException during rebuilding: %s", Throwables.getStackTraceAsString(e)));
+ throw e;
+ } finally {
+ region.closeRegionOperation();
+ indexKeyToDataPutMap.clear();
+ }
+ byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
+ final Cell aggKeyValue;
+ if (lastCell == null) {
+ aggKeyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
+ SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes,0, rowCountBytes.length);
+ } else {
+ aggKeyValue = KeyValueUtil.newKeyValue(CellUtil.cloneRow(lastCell), SINGLE_COLUMN_FAMILY,
+ SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
+ }
+ results.add(aggKeyValue);
+ return hasMore;
+ }
+
+ @Override
+ public long getMaxResultSize() {
+ return scan.getMaxResultSize();
+ }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index b9cdafd..2c93b3f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -87,6 +87,7 @@ import org.apache.phoenix.expression.ExpressionType;
import org.apache.phoenix.expression.aggregator.Aggregator;
import org.apache.phoenix.expression.aggregator.Aggregators;
import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.hbase.index.Indexer;
import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.exception.IndexWriteException;
@@ -101,6 +102,7 @@ import org.apache.phoenix.index.PhoenixIndexMetaData;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
@@ -120,7 +122,7 @@ import org.apache.phoenix.schema.stats.NoOpStatisticsCollector;
import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker;
import org.apache.phoenix.schema.stats.StatisticsCollector;
import org.apache.phoenix.schema.stats.StatisticsCollectorFactory;
-import org.apache.phoenix.schema.stats.StatisticsScanner;
+
import org.apache.phoenix.schema.stats.StatsCollectionDisabledOnServerException;
import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
@@ -389,8 +391,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
@Override
protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException, SQLException {
- RegionCoprocessorEnvironment env = c.getEnvironment();
- Region region = env.getRegion();
+ final RegionCoprocessorEnvironment env = c.getEnvironment();
+ final Region region = env.getRegion();
long ts = scan.getTimeRange().getMax();
boolean localIndexScan = ScanUtil.isLocalIndex(scan);
if (ScanUtil.isAnalyzeTable(scan)) {
@@ -408,7 +410,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
return collectStats(s, statsCollector, region, scan, env.getConfiguration());
}
} else if (ScanUtil.isIndexRebuild(scan)) {
- return rebuildIndices(s, region, scan, env);
+ return User.runAsLoginUser(new PrivilegedExceptionAction<RegionScanner>() {
+ @Override
+ public RegionScanner run() throws Exception {
+ return rebuildIndices(s, region, scan, env);
+ }
+ });
}
PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
@@ -1062,12 +1069,20 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan,
final RegionCoprocessorEnvironment env) throws IOException {
+ boolean oldCoproc = region.getTableDesc().hasCoprocessor(Indexer.class.getCanonicalName());
+ byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE);
+ IndexTool.IndexVerifyType verifyType = (valueBytes != null) ?
+ IndexTool.IndexVerifyType.fromValue(valueBytes):IndexTool.IndexVerifyType.NONE;
+ if(oldCoproc && verifyType == IndexTool.IndexVerifyType.ONLY) {
+ return new IndexerRegionScanner(innerScanner, region, scan, env);
+ }
if (!scan.isRaw()) {
Scan rawScan = new Scan(scan);
rawScan.setRaw(true);
rawScan.setMaxVersions();
rawScan.getFamilyMap().clear();
rawScan.setFilter(null);
+ rawScan.setCacheBlocks(false);
for (byte[] family : scan.getFamilyMap().keySet()) {
rawScan.addFamily(family);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 6246697..40fc883 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -27,7 +27,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.NavigableSet;
import java.util.concurrent.ConcurrentHashMap;
import com.google.common.collect.ArrayListMultimap;
@@ -66,6 +65,7 @@ import org.apache.htrace.TraceScope;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment;
+import org.apache.phoenix.coprocessor.GlobalIndexRegionScanner;
import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.hbase.index.LockManager.RowLock;
@@ -78,7 +78,6 @@ import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.hbase.index.write.IndexWriter;
import org.apache.phoenix.hbase.index.write.LazyParallelWriterIndexCommitter;
import org.apache.phoenix.index.IndexMaintainer;
@@ -88,7 +87,6 @@ import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.trace.TracingUtils;
import org.apache.phoenix.trace.util.NullSpan;
-import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ServerUtil;
import org.apache.phoenix.util.ServerUtil.ConnectionType;
@@ -648,7 +646,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
IndexMaintainer indexMaintainer = pair.getFirst();
HTableInterfaceReference hTableInterfaceReference = pair.getSecond();
if (nextDataRowState != null) {
- ValueGetter nextDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(nextDataRowState);
+ ValueGetter nextDataRowVG = new GlobalIndexRegionScanner.SimpleValueGetter(nextDataRowState);
Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
nextDataRowVG, rowKeyPtr, ts, null, null);
if (indexPut == null) {
@@ -666,7 +664,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
new Pair<Mutation, byte[]>(indexPut, rowKeyPtr.get()));
// Delete the current index row if the new index key is different than the current one
if (currentDataRowState != null) {
- ValueGetter currentDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(currentDataRowState);
+ ValueGetter currentDataRowVG = new GlobalIndexRegionScanner.SimpleValueGetter(currentDataRowState);
byte[] indexRowKeyForCurrentDataRow = indexMaintainer.buildRowKey(currentDataRowVG, rowKeyPtr,
null, null, HConstants.LATEST_TIMESTAMP);
if (Bytes.compareTo(indexPut.getRow(), indexRowKeyForCurrentDataRow) != 0) {
@@ -677,7 +675,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
}
}
} else if (currentDataRowState != null) {
- ValueGetter currentDataRowVG = new IndexRebuildRegionScanner.SimpleValueGetter(currentDataRowState);
+ ValueGetter currentDataRowVG = new GlobalIndexRegionScanner.SimpleValueGetter(currentDataRowState);
byte[] indexRowKeyForCurrentDataRow = indexMaintainer.buildRowKey(currentDataRowVG, rowKeyPtr,
null, null, HConstants.LATEST_TIMESTAMP);
Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java
index b736787..4baad43 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexToolJobCounters.java
@@ -35,5 +35,5 @@ public enum PhoenixIndexToolJobCounters {
AFTER_REBUILD_MISSING_INDEX_ROW_COUNT,
AFTER_REBUILD_INVALID_INDEX_ROW_COUNT,
AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT,
- AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT
+ AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT;
}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
index 5df041d..868e6ad 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/VerifySingleIndexRowTest.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.coprocessor.GlobalIndexRegionScanner;
import org.apache.phoenix.coprocessor.IndexRebuildRegionScanner;
import org.apache.phoenix.coprocessor.IndexToolVerificationResult;
import org.apache.phoenix.hbase.index.IndexRegionObserver;
@@ -269,7 +270,7 @@ public class VerifySingleIndexRowTest extends BaseConnectionlessQueryTest {
private void initializeGlobalMockitoSetup() throws IOException {
//setup
- when(rebuildScanner.getIndexRowKey(put)).thenCallRealMethod();
+ when(GlobalIndexRegionScanner.getIndexRowKey(indexMaintainer, put)).thenCallRealMethod();
when(rebuildScanner.prepareIndexMutations(put, delete)).thenCallRealMethod();
when(rebuildScanner.verifySingleIndexRow(Matchers.<Result>any(),
Matchers.<IndexToolVerificationResult.PhaseResult>any())).thenCallRealMethod();