You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2020/01/13 05:55:51 UTC
[phoenix] branch 4.x-HBase-1.4 updated: PHOENIX-5658 IndexTool to
verify index rows inline
This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch 4.x-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.4 by this push:
new fac0717 PHOENIX-5658 IndexTool to verify index rows inline
fac0717 is described below
commit fac07170be5931229e26fa04c8b1c78dd0578d65
Author: Kadir <ko...@salesforce.com>
AuthorDate: Sun Jan 12 17:55:11 2020 -0800
PHOENIX-5658 IndexTool to verify index rows inline
---
.../org/apache/phoenix/end2end/IndexToolIT.java | 168 +++++-
.../phoenix/compile/ServerBuildIndexCompiler.java | 5 -
.../coprocessor/BaseScannerRegionObserver.java | 3 +
.../coprocessor/IndexRebuildRegionScanner.java | 629 +++++++++++++++++++++
.../UngroupedAggregateRegionObserver.java | 236 +-------
.../phoenix/hbase/index/IndexRegionObserver.java | 17 +-
.../PhoenixServerBuildIndexInputFormat.java | 3 +-
.../apache/phoenix/mapreduce/index/IndexTool.java | 98 +++-
.../mapreduce/util/PhoenixConfigurationUtil.java | 14 +
.../apache/phoenix/query/QueryServicesOptions.java | 2 +-
10 files changed, 926 insertions(+), 249 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index ff1b354..0edc3c4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -38,18 +38,26 @@ import java.util.Map;
import java.util.Properties;
import java.util.UUID;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.end2end.index.GlobalIndexCheckerIT;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.query.ConnectionQueryServices;
@@ -369,6 +377,139 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
}
}
+ private Cell getErrorMessageFromIndexToolOutputTable(Connection conn, String dataTableFullName, String indexTableFullName)
+ throws Exception {
+ byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName);
+ byte[] dataTableFullNameBytes = Bytes.toBytes(dataTableFullName);
+ Table hIndexTable = conn.unwrap(PhoenixConnection.class).getQueryServices()
+ .getTable(IndexTool.OUTPUT_TABLE_NAME_BYTES);
+ Scan scan = new Scan();
+ ResultScanner scanner = hIndexTable.getScanner(scan);
+ boolean dataTableNameCheck = false;
+ boolean indexTableNameCheck = false;
+ Cell errorMessageCell = null;
+ Result result = scanner.next();
+ if (result != null) {
+ for (Cell cell : result.rawCells()) {
+ assertTrue(Bytes.compareTo(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+ IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, 0,
+ IndexTool.OUTPUT_TABLE_COLUMN_FAMILY.length) == 0);
+ if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+ IndexTool.DATA_TABLE_NAME_BYTES, 0, IndexTool.DATA_TABLE_NAME_BYTES.length) == 0) {
+ dataTableNameCheck = true;
+ assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+ dataTableFullNameBytes, 0, dataTableFullNameBytes.length) == 0);
+ } else if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+ IndexTool.INDEX_TABLE_NAME_BYTES, 0, IndexTool.INDEX_TABLE_NAME_BYTES.length) == 0) {
+ indexTableNameCheck = true;
+ assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+ indexTableFullNameBytes, 0, indexTableFullNameBytes.length) == 0);
+ } else if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+ IndexTool.ERROR_MESSAGE_BYTES, 0, IndexTool.ERROR_MESSAGE_BYTES.length) == 0) {
+ errorMessageCell = cell;
+ }
+ }
+ }
+ assertTrue(dataTableNameCheck && indexTableNameCheck && errorMessageCell != null);
+ return errorMessageCell;
+ }
+
+ @Test
+ public void testIndexToolVerifyAfterOption() throws Exception {
+ // This test is for building non-transactional global indexes with direct api
+ if (localIndex || transactional || !directApi || useSnapshot) {
+ return;
+ }
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ String viewName = generateUniqueName();
+ String viewFullName = SchemaUtil.getTableName(schemaName, viewName);
+ conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
+ + tableDDLOptions);
+ conn.commit();
+ conn.createStatement().execute("CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName);
+ conn.commit();
+ // Insert a row
+ conn.createStatement().execute("upsert into " + viewFullName + " values (1, 'Phoenix', 12345)");
+ conn.commit();
+ // Configure IndexRegionObserver to fail the first write phase. This should not
+ // lead to any change on index and thus the index verify during index rebuild should fail
+ IndexRegionObserver.setIgnoreIndexRebuildForTesting(true);
+ conn.createStatement().execute(String.format(
+ "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC", indexTableName, viewFullName));
+ // Run the index MR job and verify that the index table rebuild fails
+ runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
+ null, -1, IndexTool.IndexVerifyType.AFTER);
+ // The index tool output table should report that there is a missing index row
+ Cell cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, "_IDX_" + dataTableFullName);
+ byte[] expectedValueBytes = Bytes.toBytes("Missing index rows - Expected: 1 Actual: 0");
+ assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+ expectedValueBytes, 0, expectedValueBytes.length) == 0);
+ IndexRegionObserver.setIgnoreIndexRebuildForTesting(false);
+ Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+ TableName indexToolOutputTable = TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES);
+ admin.disableTable(indexToolOutputTable);
+ admin.deleteTable(indexToolOutputTable);
+ }
+ }
+
+ @Test
+ public void testIndexToolOnlyVerifyOption() throws Exception {
+ // This test is for building non-transactional global indexes with direct api
+ if (localIndex || transactional || !directApi || useSnapshot) {
+ return;
+ }
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+ String indexTableName = generateUniqueName();
+ String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) COLUMN_ENCODED_BYTES=0");
+ // Insert a row
+ conn.createStatement().execute("upsert into " + dataTableFullName + " values (1, 'Phoenix', 'A')");
+ conn.commit();
+ conn.createStatement().execute(String.format(
+ "CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) ASYNC", indexTableName, dataTableFullName));
+ // Run the index MR job to only verify that each data table row has a corresponding index row
+ // IndexTool will go through each data table row and record the mismatches in the output table
+ // called PHOENIX_INDEX_TOOL
+ runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
+ null, 0, IndexTool.IndexVerifyType.ONLY);
+ Cell cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName);
+ byte[] expectedValueBytes = Bytes.toBytes("Missing index rows - Expected: 1 Actual: 0");
+ assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+ expectedValueBytes, 0, expectedValueBytes.length) == 0);
+ // Delete the output table for the next test
+ Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+ TableName indexToolOutputTable = TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES);
+ admin.disableTable(indexToolOutputTable);
+ admin.deleteTable(indexToolOutputTable);
+ // Run the index tool to populate the index while verifying rows
+ runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
+ null, 0, IndexTool.IndexVerifyType.AFTER);
+ // Corrupt one cell by writing directly into the index table
+ conn.createStatement().execute("upsert into " + indexTableFullName + " values ('Phoenix', 1, 'B')");
+ conn.commit();
+ // Run the index tool using the only-verify option to detect this mismatch between the data and index table
+ runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
+ null, 0, IndexTool.IndexVerifyType.ONLY);
+ cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName);
+ expectedValueBytes = Bytes.toBytes("Not matching value for 0:0:CODE E:A A:B");
+ assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+ expectedValueBytes, 0, expectedValueBytes.length) == 0);
+ admin.disableTable(indexToolOutputTable);
+ admin.deleteTable(indexToolOutputTable);
+ }
+ }
+
@Test
public void testIndexToolWithTenantId() throws Exception {
if (!useTenantId) { return;}
@@ -642,9 +783,10 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
actualExplainPlan.contains(expectedExplainPlan));
}
- public static String[] getArgValues(boolean directApi, boolean useSnapshot, String schemaName,
- String dataTable, String indxTable, String tenantId) {
- final List<String> args = Lists.newArrayList();
+ private static List<String> getArgList (boolean directApi, boolean useSnapshot, String schemaName,
+ String dataTable, String indxTable, String tenantId,
+ IndexTool.IndexVerifyType verifyType) {
+ List<String> args = Lists.newArrayList();
if (schemaName != null) {
args.add("-s");
args.add(schemaName);
@@ -656,6 +798,8 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
if (directApi) {
args.add("-direct");
}
+ args.add("-v" + verifyType.getValue()); // verify index rows inline
+
// Need to run this job in foreground for the test to be deterministic
args.add("-runfg");
if (useSnapshot) {
@@ -669,6 +813,12 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
args.add("-op");
args.add("/tmp/" + UUID.randomUUID().toString());
+ return args;
+ }
+
+ public static String[] getArgValues(boolean directApi, boolean useSnapshot, String schemaName,
+ String dataTable, String indexTable, String tenantId, IndexTool.IndexVerifyType verifyType) {
+ List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable, tenantId, verifyType);
return args.toArray(new String[0]);
}
@@ -714,12 +864,20 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
public static IndexTool runIndexTool(boolean directApi, boolean useSnapshot, String schemaName,
String dataTableName, String indexTableName, String tenantId, int expectedStatus,
String... additionalArgs) throws Exception {
+ return runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, tenantId, expectedStatus,
+ IndexTool.IndexVerifyType.NONE, additionalArgs);
+ }
+
+ public static IndexTool runIndexTool(boolean directApi, boolean useSnapshot, String schemaName,
+ String dataTableName, String indexTableName, String tenantId,
+ int expectedStatus, IndexTool.IndexVerifyType verifyType,
+ String... additionalArgs) throws Exception {
IndexTool indexingTool = new IndexTool();
Configuration conf = new Configuration(getUtility().getConfiguration());
conf.set(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
indexingTool.setConf(conf);
- final String[] cmdArgs =
- getArgValues(directApi, useSnapshot, schemaName, dataTableName, indexTableName, tenantId);
+ final String[] cmdArgs = getArgValues(directApi, useSnapshot, schemaName, dataTableName,
+ indexTableName, tenantId, verifyType);
List<String> cmdArgList = new ArrayList<>(Arrays.asList(cmdArgs));
cmdArgList.addAll(Arrays.asList(additionalArgs));
int status = indexingTool.run(cmdArgList.toArray(new String[cmdArgList.size()]));
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
index 7d1c1b4..1b482aa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.compile;
import java.sql.SQLException;
import java.util.Collections;
-import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Scan;
@@ -36,11 +35,7 @@ import org.apache.phoenix.schema.*;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.ByteUtil;
-import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ScanUtil;
-import org.apache.phoenix.util.StringUtil;
-
-import com.google.common.collect.Lists;
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 71e7546..6e0a1e4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -86,6 +86,9 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
public static final String LOCAL_INDEX_BUILD = "_LocalIndexBuild";
// The number of index rows to be rebuild in one RPC call
public static final String INDEX_REBUILD_PAGING = "_IndexRebuildPaging";
+ // Index verification type done by the index tool
+ public static final String INDEX_REBUILD_VERIFY_TYPE = "_IndexRebuildVerifyType";
+
/*
* Attribute to denote that the index maintainer has been serialized using its proto-buf presentation.
* Needed for backward compatibility purposes. TODO: get rid of this in next major release.
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
new file mode 100644
index 0000000..142871b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -0,0 +1,629 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.cache.ServerCacheClient;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
+import org.apache.phoenix.hbase.index.parallel.Task;
+import org.apache.phoenix.hbase.index.parallel.TaskBatch;
+import org.apache.phoenix.hbase.index.parallel.TaskRunner;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
+import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner;
+import org.apache.phoenix.hbase.index.table.HTableFactory;
+import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.GlobalIndexChecker;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
+import static org.apache.phoenix.hbase.index.IndexRegionObserver.removeEmptyColumn;
+import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
+
+public class IndexRebuildRegionScanner extends BaseRegionScanner {
+ private static final Logger LOGGER = LoggerFactory.getLogger(IndexRebuildRegionScanner.class);
+ public static final String NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY = "index.verify.threads.max";
+ private static final int DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS = 17;
+ public static final String INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY = "index.verify.threads.max";
+ private static final int DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK = 2048;
+ private long pageSizeInRows = Long.MAX_VALUE;
+ private int rowCountPerTask;
+ private boolean hasMore;
+ private final int maxBatchSize;
+ private UngroupedAggregateRegionObserver.MutationList mutations;
+ private final long maxBatchSizeBytes;
+ private final long blockingMemstoreSize;
+ private final byte[] clientVersionBytes;
+ private byte[] indexMetaData;
+ private boolean useProto = true;
+ private Scan scan;
+ private RegionScanner innerScanner;
+ private Region region;
+ private IndexMaintainer indexMaintainer;
+ private byte[] indexRowKey = null;
+ private Table indexHTable = null;
+ private Table outputHTable = null;
+ private IndexTool.IndexVerifyType verifyType = IndexTool.IndexVerifyType.NONE;
+ private boolean verify = false;
+ private boolean onlyVerify = false;
+ private Map<byte[], Put> indexKeyToDataPutMap;
+ private TaskRunner pool;
+ private TaskBatch<Boolean> tasks;
+ private String exceptionMessage;
+ private UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver;
+ private RegionCoprocessorEnvironment env;
+ private HTableFactory hTableFactory;
+
+ IndexRebuildRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan,
+ final RegionCoprocessorEnvironment env,
+ UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver) throws IOException {
+ super(innerScanner);
+ final Configuration config = env.getConfiguration();
+ if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) != null) {
+ pageSizeInRows = config.getLong(INDEX_REBUILD_PAGE_SIZE_IN_ROWS,
+ QueryServicesOptions.DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS);
+ }
+ maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+ mutations = new UngroupedAggregateRegionObserver.MutationList(maxBatchSize);
+ maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
+ QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
+ blockingMemstoreSize = UngroupedAggregateRegionObserver.getBlockingMemstoreSize(region, config);
+ clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
+ indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
+ if (indexMetaData == null) {
+ useProto = false;
+ indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+ }
+ if (!scan.isRaw()) {
+ List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(indexMetaData, true);
+ indexMaintainer = maintainers.get(0);
+ }
+ this.scan = scan;
+ this.innerScanner = innerScanner;
+ this.region = region;
+ this.env = env;
+ this.ungroupedAggregateRegionObserver = ungroupedAggregateRegionObserver;
+ indexRowKey = scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY);
+ byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE);
+ if (valueBytes != null) {
+ verifyType = IndexTool.IndexVerifyType.fromValue(valueBytes);
+ if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.ONLY) {
+ verify = true;
+ if (verifyType == IndexTool.IndexVerifyType.ONLY) {
+ onlyVerify = true;
+ }
+ hTableFactory = ServerUtil.getDelegateHTableFactory(env, ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION);
+ indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+ outputHTable = hTableFactory.getTable(new ImmutableBytesPtr(IndexTool.OUTPUT_TABLE_NAME_BYTES));
+ indexKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor(
+ new ThreadPoolBuilder("IndexVerify",
+ env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY,
+ DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS).setCoreTimeout(
+ INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env));
+ tasks = new TaskBatch<>(DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS);
+ rowCountPerTask = config.getInt(INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY,
+ DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK);
+ }
+ }
+ }
+
+ @Override
+ public HRegionInfo getRegionInfo() {
+ return region.getRegionInfo();
+ }
+
+ @Override
+ public boolean isFilterDone() { return hasMore; }
+
+ @Override
+ public void close() throws IOException {
+ innerScanner.close();
+ if (verify) {
+ this.pool.stop("IndexRebuildRegionScanner is closing");
+ indexHTable.close();
+ outputHTable.close();
+ }
+ }
+
+ private void setMutationAttributes(Mutation m, byte[] uuidValue) {
+ m.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
+ m.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+ m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
+ BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
+ m.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
+ // Since we're replaying existing mutations, it makes no sense to write them to the wal
+ m.setDurability(Durability.SKIP_WAL);
+ }
+
+ private Delete generateDeleteMarkers(List<Cell> row) {
+ Set<ColumnReference> allColumns = indexMaintainer.getAllColumns();
+ if (row.size() == allColumns.size() + 1) {
+ // We have all the columns for the index table plus the empty column. So, no delete marker is needed
+ return null;
+ }
+ Set<ColumnReference> includedColumns = Sets.newLinkedHashSetWithExpectedSize(row.size());
+ long ts = 0;
+ for (Cell cell : row) {
+ includedColumns.add(new ColumnReference(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell)));
+ if (ts < cell.getTimestamp()) {
+ ts = cell.getTimestamp();
+ }
+ }
+ byte[] rowKey;
+ Delete del = null;
+ for (ColumnReference column : allColumns) {
+ if (!includedColumns.contains(column)) {
+ if (del == null) {
+ Cell cell = row.get(0);
+ rowKey = CellUtil.cloneRow(cell);
+ del = new Delete(rowKey);
+ }
+ del.addColumns(column.getFamily(), column.getQualifier(), ts);
+ }
+ }
+ return del;
+ }
+
+ private void addToBeVerifiedIndexRows() throws IOException {
+ for (Mutation mutation : mutations) {
+ if (mutation instanceof Put) {
+ indexKeyToDataPutMap.put(getIndexRowKey((Put)mutation), (Put)mutation);
+ }
+ }
+ }
+
+ private byte[] commitIfReady(byte[] uuidValue) throws IOException {
+ if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
+ ungroupedAggregateRegionObserver.checkForRegionClosing();
+ ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutations, blockingMemstoreSize);
+ uuidValue = ServerCacheClient.generateId();
+ if (verify) {
+ addToBeVerifiedIndexRows();
+ }
+ mutations.clear();
+ }
+ return uuidValue;
+ }
+
+ private class SimpleValueGetter implements ValueGetter {
+ final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
+ final Put put;
+ SimpleValueGetter (final Put put) {
+ this.put = put;
+ }
+ @Override
+ public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
+ List<Cell> cellList = put.get(ref.getFamily(), ref.getQualifier());
+ if (cellList == null || cellList.isEmpty()) {
+ return null;
+ }
+ Cell cell = cellList.get(0);
+ valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+ return valuePtr;
+ }
+
+ @Override
+ public byte[] getRowKey() {
+ return put.getRow();
+ }
+
+ }
+
+ private byte[] getIndexRowKey(final Put dataRow) throws IOException {
+ ValueGetter valueGetter = new SimpleValueGetter(dataRow);
+ byte[] builtIndexRowKey = indexMaintainer.buildRowKey(valueGetter, new ImmutableBytesWritable(dataRow.getRow()),
+ null, null, HConstants.LATEST_TIMESTAMP);
+ return builtIndexRowKey;
+ }
+
+ private boolean checkIndexRow(final byte[] indexRowKey, final Put put) throws IOException {
+ byte[] builtIndexRowKey = getIndexRowKey(put);
+ if (Bytes.compareTo(builtIndexRowKey, 0, builtIndexRowKey.length,
+ indexRowKey, 0, indexRowKey.length) != 0) {
+ return false;
+ }
+ return true;
+ }
+
+ private void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
+ String errorMsg) {
+ logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs,
+ errorMsg, null, null);
+
+ }
+
+ private void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
+ String errorMsg, byte[] expectedValue, byte[] actualValue) {
+ final byte[] E_VALUE_PREFIX_BYTES = Bytes.toBytes(" E:");
+ final byte[] A_VALUE_PREFIX_BYTES = Bytes.toBytes(" A:");
+ final int PREFIX_LENGTH = 3;
+ final int TOTAL_PREFIX_LENGTH = 6;
+
+ try {
+ int longLength = Long.SIZE / Byte.SIZE;
+ byte[] rowKey;
+ if (dataRowKey != null) {
+ rowKey = new byte[longLength + dataRowKey.length];
+ Bytes.putLong(rowKey, 0, scan.getTimeRange().getMax());
+ Bytes.putBytes(rowKey, longLength, dataRowKey, 0, dataRowKey.length);
+ } else {
+ rowKey = new byte[longLength];
+ Bytes.putLong(rowKey, 0, scan.getTimeRange().getMax());
+ }
+ Put put = new Put(rowKey);
+ long scanMaxTs = scan.getTimeRange().getMax();
+ put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_NAME_BYTES,
+ scanMaxTs, region.getRegionInfo().getTable().getName());
+ put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_NAME_BYTES,
+ scanMaxTs, indexMaintainer.getIndexTableName());
+ if (dataRowKey != null) {
+ put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_TS_BYTES,
+ scanMaxTs, Bytes.toBytes(dataRowTs));
+ }
+ if (indexRowKey != null) {
+ put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_ROW_KEY_BYTES,
+ scanMaxTs, indexRowKey);
+ put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_TS_BYTES,
+ scanMaxTs, Bytes.toBytes(indexRowTs));
+ }
+ byte[] errorMessageBytes;
+ if (expectedValue != null) {
+ errorMessageBytes = new byte[errorMsg.length() + expectedValue.length + actualValue.length +
+ TOTAL_PREFIX_LENGTH];
+ Bytes.putBytes(errorMessageBytes, 0, Bytes.toBytes(errorMsg), 0, errorMsg.length());
+ int length = errorMsg.length();
+ Bytes.putBytes(errorMessageBytes, length, E_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH);
+ length += PREFIX_LENGTH;
+ Bytes.putBytes(errorMessageBytes, length, expectedValue, 0, expectedValue.length);
+ length += expectedValue.length;
+ Bytes.putBytes(errorMessageBytes, length, A_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH);
+ length += PREFIX_LENGTH;
+ Bytes.putBytes(errorMessageBytes, length, actualValue, 0, actualValue.length);
+
+ }
+ else {
+ errorMessageBytes = Bytes.toBytes(errorMsg);
+ }
+ put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.ERROR_MESSAGE_BYTES, scanMaxTs, errorMessageBytes);
+ outputHTable.put(put);
+ } catch (IOException e) {
+ exceptionMessage = "LogToIndexToolOutputTable failed " + e;
+ }
+ }
+
+ private long getMaxTimestamp(Result result) {
+ long ts = 0;
+ for (Cell cell : result.rawCells()) {
+ if (ts < cell.getTimestamp()) {
+ ts = cell.getTimestamp();
+ }
+ }
+ return ts;
+ }
+
+ private void verifySingleIndexRow(Result indexRow, final Put dataRow) throws IOException {
+ ValueGetter valueGetter = new SimpleValueGetter(dataRow);
+ long ts = 0;
+ for (List<Cell> cells : dataRow.getFamilyCellMap().values()) {
+ if (cells == null) {
+ break;
+ }
+ for (Cell cell : cells) {
+ if (ts < cell.getTimestamp()) {
+ ts = cell.getTimestamp();
+ }
+ }
+ }
+ Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+ valueGetter, new ImmutableBytesWritable(dataRow.getRow()), ts, null, null);
+ if (indexPut == null) {
+ // This means the index row does not have any covered columns. We just need to check if the index row
+ // has only one cell (which is the empty column cell)
+ if (indexRow.rawCells().length == 1) {
+ return;
+ }
+ String errorMsg = "Expected to find only empty column cell but got "
+ + indexRow.rawCells().length;
+ logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
+ if (onlyVerify) {
+ return;
+ }
+ exceptionMessage = "Index verify failed - " + errorMsg + indexHTable.getName();
+ throw new IOException(exceptionMessage);
+ }
+ else {
+ // Remove the empty column prepared by Index codec as we need to change its value
+ removeEmptyColumn(indexPut, indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ indexMaintainer.getEmptyKeyValueQualifier());
+ }
+ // Add the empty column
+ indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+ indexMaintainer.getEmptyKeyValueQualifier(), ts, VERIFIED_BYTES);
+ int cellCount = 0;
+ for (List<Cell> cells : indexPut.getFamilyCellMap().values()) {
+ if (cells == null) {
+ break;
+ }
+ for (Cell expectedCell : cells) {
+ byte[] family = CellUtil.cloneFamily(expectedCell);
+ byte[] qualifier = CellUtil.cloneQualifier(expectedCell);
+ Cell actualCell = indexRow.getColumnLatestCell(family, qualifier);
+ if (actualCell == null) {
+ String errorMsg = " Missing cell " + Bytes.toString(family) + ":" +
+ Bytes.toString(qualifier);
+ logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
+ if (onlyVerify) {
+ return;
+ }
+ exceptionMessage = "Index verify failed - Missing cell " + indexHTable.getName();
+ throw new IOException(exceptionMessage);
+ }
+ // Check all columns
+ if (!CellUtil.matchingValue(actualCell, expectedCell)) {
+ String errorMsg = "Not matching value for " + Bytes.toString(family) + ":" +
+ Bytes.toString(qualifier);
+ logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow),
+ errorMsg, CellUtil.cloneValue(expectedCell), CellUtil.cloneValue(actualCell));
+ if (onlyVerify) {
+ return;
+ }
+ exceptionMessage = "Index verify failed - Not matching cell value - " + indexHTable.getName();
+ throw new IOException(exceptionMessage);
+ }
+ cellCount++;
+ }
+ }
+ if (cellCount != indexRow.rawCells().length) {
+ String errorMsg = "Expected to find " + cellCount + " cells but got "
+ + indexRow.rawCells().length + " cells";
+ logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
+ if (!onlyVerify) {
+ exceptionMessage = "Index verify failed - " + errorMsg + " - " + indexHTable.getName();
+ throw new IOException(exceptionMessage);
+ }
+ }
+ }
+
+ private void verifyIndexRows(ArrayList<KeyRange> keys) throws IOException {
+ int expectedRowCount = keys.size();
+ ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
+ Scan indexScan = new Scan();
+ indexScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax());
+ scanRanges.initializeScan(indexScan);
+ SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
+ indexScan.setFilter(skipScanFilter);
+ int rowCount = 0;
+ try (ResultScanner resultScanner = indexHTable.getScanner(indexScan)) {
+ for (Result result = resultScanner.next(); (result != null); result = resultScanner.next()) {
+ Put dataPut = indexKeyToDataPutMap.get(result.getRow());
+ if (dataPut == null) {
+ exceptionMessage = "Index verify failed - Missing data row - " + indexHTable.getName();
+ String errorMsg = "Missing data row";
+ logToIndexToolOutputTable(null, result.getRow(), 0, getMaxTimestamp(result), errorMsg);
+ if (!onlyVerify) {
+ throw new IOException(exceptionMessage);
+ }
+ }
+ verifySingleIndexRow(result, dataPut);
+ rowCount++;
+ }
+ } catch (Throwable t) {
+ ServerUtil.throwIOException(indexHTable.getName().toString(), t);
+ }
+ if (rowCount != expectedRowCount) {
+ String errorMsg = "Missing index rows - Expected: " + expectedRowCount +
+ " Actual: " + rowCount;
+ exceptionMessage = "Index verify failed - " + errorMsg + " - " + indexHTable.getName();
+ logToIndexToolOutputTable(null, null, 0, 0, errorMsg);
+ if (!onlyVerify) {
+ throw new IOException(exceptionMessage);
+ }
+ }
+ }
+
+ private void addVerifyTask(final ArrayList<KeyRange> keys) {
+ tasks.add(new Task<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ try {
+ if (Thread.currentThread().isInterrupted()) {
+ exceptionMessage = "Pool closed, not attempting to verify index rows! " + indexHTable.getName();
+ throw new IOException(exceptionMessage);
+ }
+ verifyIndexRows(keys);
+ } catch (Exception e) {
+ throw e;
+ }
+ return Boolean.TRUE;
+ }
+ });
+ }
+
+ @Override
+ public boolean next(List<Cell> results) throws IOException {
+ int rowCount = 0;
+ region.startRegionOperation();
+ try {
+ byte[] uuidValue = ServerCacheClient.generateId();
+ synchronized (innerScanner) {
+ do {
+ List<Cell> row = new ArrayList<Cell>();
+ hasMore = innerScanner.nextRaw(row);
+ if (!row.isEmpty()) {
+ Put put = null;
+ Delete del = null;
+ for (Cell cell : row) {
+ if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+ if (put == null) {
+ put = new Put(CellUtil.cloneRow(cell));
+ setMutationAttributes(put, uuidValue);
+ mutations.add(put);
+ }
+ put.add(cell);
+ } else {
+ if (del == null) {
+ del = new Delete(CellUtil.cloneRow(cell));
+ setMutationAttributes(del, uuidValue);
+ mutations.add(del);
+ }
+ del.addDeleteMarker(cell);
+ }
+ }
+ if (onlyVerify) {
+ rowCount++;
+ continue;
+ }
+ uuidValue = commitIfReady(uuidValue);
+ if (!scan.isRaw()) {
+ Delete deleteMarkers = generateDeleteMarkers(row);
+ if (deleteMarkers != null) {
+ setMutationAttributes(deleteMarkers, uuidValue);
+ mutations.add(deleteMarkers);
+ uuidValue = commitIfReady(uuidValue);
+ }
+ }
+ if (indexRowKey != null) {
+ // GlobalIndexChecker passed the index row key. This is to build a single index row.
+ // Check if the data table row we have just scanned matches with the index row key.
+ // If not, there is no need to build the index row from this data table row,
+ // and just return zero row count.
+ if (checkIndexRow(indexRowKey, put)) {
+ rowCount = GlobalIndexChecker.RebuildReturnCode.INDEX_ROW_EXISTS.getValue();
+ }
+ else {
+ rowCount = GlobalIndexChecker.RebuildReturnCode.NO_INDEX_ROW.getValue();
+ }
+ break;
+ }
+ rowCount++;
+ }
+
+ } while (hasMore && rowCount < pageSizeInRows);
+ if (!mutations.isEmpty() && !onlyVerify) {
+ ungroupedAggregateRegionObserver.checkForRegionClosing();
+ ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutations, blockingMemstoreSize);
+ if (verify) {
+ addToBeVerifiedIndexRows();
+ }
+ }
+ }
+ } catch (IOException e) {
+ hasMore = false;
+ LOGGER.error("IOException during rebuilding: " + Throwables.getStackTraceAsString(e));
+ throw e;
+ } finally {
+ region.closeRegionOperation();
+ }
+ if (verify) {
+ if (onlyVerify) {
+ addToBeVerifiedIndexRows();
+ }
+ ArrayList<KeyRange> keys = new ArrayList<>(rowCountPerTask);
+ for (byte[] key : indexKeyToDataPutMap.keySet()) {
+ keys.add(PVarbinary.INSTANCE.getKeyRange(key));
+ if (keys.size() == rowCountPerTask) {
+ addVerifyTask(keys);
+ keys = new ArrayList<>(rowCountPerTask);
+ }
+ }
+ if (keys.size() > 0) {
+ addVerifyTask(keys);
+ }
+ List<Boolean> taskResultList = null;
+ try {
+ LOGGER.debug("Waiting on index verify tasks to complete...");
+ taskResultList = this.pool.submitUninterruptible(tasks);
+ } catch (ExecutionException e) {
+ throw new RuntimeException("Should not fail on the results while using a WaitForCompletionTaskRunner", e);
+ } catch (EarlyExitFailure e) {
+ throw new RuntimeException("Stopped while waiting for batch, quitting!", e);
+ }
+ finally {
+ indexKeyToDataPutMap.clear();
+ tasks.getTasks().clear();
+ }
+ for (Boolean result : taskResultList) {
+ if (result == null) {
+ // there was a failure
+ throw new IOException(exceptionMessage);
+ }
+ }
+ }
+ byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
+ final Cell aggKeyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
+ SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
+ results.add(aggKeyValue);
+ return hasMore;
+ }
+
+ @Override
+ public long getMaxResultSize() {
+ return scan.getMaxResultSize();
+ }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 347dd01..b173045 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -21,7 +21,6 @@ import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
-import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
@@ -38,7 +37,6 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -46,7 +44,6 @@ import javax.annotation.concurrent.GuardedBy;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -57,7 +54,6 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
@@ -81,7 +77,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.cache.GlobalCache;
-import org.apache.phoenix.cache.ServerCacheClient;
import org.apache.phoenix.cache.TenantCache;
import org.apache.phoenix.coprocessor.generated.PTableProtos;
import org.apache.phoenix.exception.DataExceedsCapacityException;
@@ -98,7 +93,6 @@ import org.apache.phoenix.hbase.index.exception.IndexWriteException;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
-import org.apache.phoenix.index.GlobalIndexChecker;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
@@ -247,7 +241,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
indexWriteProps = new ReadOnlyProps(indexWriteConfig.iterator());
}
- private void commitBatchWithRetries(final Region region, final List<Mutation> localRegionMutations, final long blockingMemstoreSize) throws IOException {
+ public void commitBatchWithRetries(final Region region, final List<Mutation> localRegionMutations, final long blockingMemstoreSize) throws IOException {
try {
commitBatch(region, localRegionMutations, blockingMemstoreSize);
} catch (IOException e) {
@@ -275,7 +269,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
// flush happen which decrease the memstore size and then writes allowed on the region.
for (int i = 0; blockingMemstoreSize > 0 && region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) {
try {
- checkForRegionClosingOrSplitting();
+ checkForRegionClosing();
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -324,7 +318,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
* a high chance that flush might not proceed and memstore won't be freed up.
* @throws IOException
*/
- private void checkForRegionClosingOrSplitting() throws IOException {
+ public void checkForRegionClosing() throws IOException {
synchronized (lock) {
if(isRegionClosingOrSplitting) {
lock.notifyAll();
@@ -382,7 +376,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
}
- private long getBlockingMemstoreSize(Region region, Configuration conf) {
+ public static long getBlockingMemstoreSize(Region region, Configuration conf) {
long flushSize = region.getTableDesc().getMemStoreFlushSize();
if (flushSize <= 0) {
@@ -414,7 +408,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
return collectStats(s, statsCollector, region, scan, env.getConfiguration());
}
} else if (ScanUtil.isIndexRebuild(scan)) {
- return rebuildIndices(s, region, scan, env.getConfiguration());
+ return rebuildIndices(s, region, scan, env);
}
PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
@@ -1061,226 +1055,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
}
}
- private class IndexRebuildRegionScanner extends BaseRegionScanner {
- private long pageSizeInRows = Long.MAX_VALUE;
- private boolean hasMore;
- private final int maxBatchSize;
- private MutationList mutations;
- private final long maxBatchSizeBytes;
- private final long blockingMemstoreSize;
- private final byte[] clientVersionBytes;
- private List<Cell> results = new ArrayList<Cell>();
- private byte[] indexMetaData;
- private boolean useProto = true;
- private Scan scan;
- private RegionScanner innerScanner;
- private Region region;
- private IndexMaintainer indexMaintainer;
- private byte[] indexRowKey = null;
-
- IndexRebuildRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan,
- final Configuration config) {
- super(innerScanner);
- if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) != null) {
- pageSizeInRows = config.getLong(INDEX_REBUILD_PAGE_SIZE_IN_ROWS,
- QueryServicesOptions.DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS);
- }
-
- maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
- mutations = new MutationList(maxBatchSize);
- maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
- QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
- blockingMemstoreSize = getBlockingMemstoreSize(region, config);
- clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
- indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
- if (indexMetaData == null) {
- useProto = false;
- indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
- }
- if (!scan.isRaw()) {
- List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(indexMetaData, true);
- indexMaintainer = maintainers.get(0);
- }
- this.scan = scan;
- this.innerScanner = innerScanner;
- this.region = region;
- indexRowKey = scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY);
- }
-
- @Override
- public HRegionInfo getRegionInfo() {
- return region.getRegionInfo();
- }
-
- @Override
- public boolean isFilterDone() { return hasMore; }
-
- @Override
- public void close() throws IOException { innerScanner.close(); }
-
- private void setMutationAttributes(Mutation m, byte[] uuidValue) {
- m.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
- m.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
- m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
- BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
- m.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
- // Since we're replaying existing mutations, it makes no sense to write them to the wal
- m.setDurability(Durability.SKIP_WAL);
- }
-
- private Delete generateDeleteMarkers(List<Cell> row) {
- Set<ColumnReference> allColumns = indexMaintainer.getAllColumns();
- if (row.size() == allColumns.size() + 1) {
- // We have all the columns for the index table plus the empty column. So, no delete marker is needed
- return null;
- }
- Set<ColumnReference> includedColumns = Sets.newLinkedHashSetWithExpectedSize(row.size());
- long ts = 0;
- for (Cell cell : row) {
- includedColumns.add(new ColumnReference(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell)));
- if (ts < cell.getTimestamp()) {
- ts = cell.getTimestamp();
- }
- }
- byte[] rowKey;
- Delete del = null;
- for (ColumnReference column : allColumns) {
- if (!includedColumns.contains(column)) {
- if (del == null) {
- Cell cell = row.get(0);
- rowKey = CellUtil.cloneRow(cell);
- del = new Delete(rowKey);
- }
- del.addColumns(column.getFamily(), column.getQualifier(), ts);
- }
- }
- return del;
- }
-
- private byte[] commitIfReady(byte[] uuidValue) throws IOException {
- if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
- checkForRegionClosingOrSplitting();
- commitBatchWithRetries(region, mutations, blockingMemstoreSize);
- uuidValue = ServerCacheClient.generateId();
- mutations.clear();
- }
- return uuidValue;
- }
-
- private boolean checkIndexRow(final byte[] indexRowKey, final Put put) throws IOException {
- ValueGetter getter = new ValueGetter() {
- final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
-
- @Override
- public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
- List<Cell> cellList = put.get(ref.getFamily(), ref.getQualifier());
- if (cellList == null || cellList.isEmpty()) {
- return null;
- }
- Cell cell = cellList.get(0);
- valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
- return valuePtr;
- }
-
- @Override
- public byte[] getRowKey() {
- return put.getRow();
- }
- };
- byte[] builtIndexRowKey = indexMaintainer.buildRowKey(getter, new ImmutableBytesWritable(put.getRow()),
- null, null, HConstants.LATEST_TIMESTAMP);
- if (Bytes.compareTo(builtIndexRowKey, 0, builtIndexRowKey.length,
- indexRowKey, 0, indexRowKey.length) != 0) {
- return false;
- }
- return true;
- }
-
- @Override
- public boolean next(List<Cell> results) throws IOException {
- int rowCount = 0;
- region.startRegionOperation();
- try {
- byte[] uuidValue = ServerCacheClient.generateId();
- synchronized (innerScanner) {
- do {
- List<Cell> row = new ArrayList<Cell>();
- hasMore = innerScanner.nextRaw(row);
- if (!row.isEmpty()) {
- Put put = null;
- Delete del = null;
- for (Cell cell : row) {
- if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
- if (put == null) {
- put = new Put(CellUtil.cloneRow(cell));
- setMutationAttributes(put, uuidValue);
- mutations.add(put);
- }
- put.add(cell);
- } else {
- if (del == null) {
- del = new Delete(CellUtil.cloneRow(cell));
- setMutationAttributes(del, uuidValue);
- mutations.add(del);
- }
- del.addDeleteMarker(cell);
- }
- }
- uuidValue = commitIfReady(uuidValue);
- if (!scan.isRaw()) {
- Delete deleteMarkers = generateDeleteMarkers(row);
- if (deleteMarkers != null) {
- setMutationAttributes(deleteMarkers, uuidValue);
- mutations.add(deleteMarkers);
- uuidValue = commitIfReady(uuidValue);
- }
- }
- if (indexRowKey != null) {
- // GlobalIndexChecker passed the index row key. This is to build a single index row.
- // Check if the data table row we have just scanned matches with the index row key.
- // If not, there is no need to build the index row from this data table row,
- // and just return zero row count.
- if (checkIndexRow(indexRowKey, put)) {
- rowCount = GlobalIndexChecker.RebuildReturnCode.INDEX_ROW_EXISTS.getValue();
- }
- else {
- rowCount = GlobalIndexChecker.RebuildReturnCode.NO_INDEX_ROW.getValue();
- }
- break;
- }
- rowCount++;
- }
-
- } while (hasMore && rowCount < pageSizeInRows);
- if (!mutations.isEmpty()) {
- checkForRegionClosingOrSplitting();
- commitBatchWithRetries(region, mutations, blockingMemstoreSize);
- }
- }
- } catch (IOException e) {
- hasMore = false;
- LOGGER.error("IOException during rebuilding: " + Throwables.getStackTraceAsString(e));
- throw e;
- } finally {
- region.closeRegionOperation();
- }
- byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
- final KeyValue aggKeyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
- SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
- results.add(aggKeyValue);
- return hasMore;
- }
-
- @Override
- public long getMaxResultSize() {
- return scan.getMaxResultSize();
- }
- }
-
private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan,
- final Configuration config) throws IOException {
+ final RegionCoprocessorEnvironment env) throws IOException {
- RegionScanner scanner = new IndexRebuildRegionScanner(innerScanner, region, scan, config);
+ RegionScanner scanner = new IndexRebuildRegionScanner(innerScanner, region, scan, env, this);
return scanner;
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 1e8649e..6e12950 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -125,13 +125,14 @@ public class IndexRegionObserver extends BaseRegionObserver {
}
}
- private static boolean failPreIndexUpdatesForTesting = false;
- private static boolean failPostIndexUpdatesForTesting = false;
- private static boolean failDataTableUpdatesForTesting = false;
+ private static boolean ignoreIndexRebuildForTesting = false;
+ private static boolean failPreIndexUpdatesForTesting = false;
+ private static boolean failPostIndexUpdatesForTesting = false;
+ private static boolean failDataTableUpdatesForTesting = false;
- public static void setFailPreIndexUpdatesForTesting(boolean fail) {
- failPreIndexUpdatesForTesting = fail;
- }
+ public static void setIgnoreIndexRebuildForTesting(boolean ignore) { ignoreIndexRebuildForTesting = ignore; }
+
+ public static void setFailPreIndexUpdatesForTesting(boolean fail) { failPreIndexUpdatesForTesting = fail; }
public static void setFailPostIndexUpdatesForTesting(boolean fail) { failPostIndexUpdatesForTesting = fail; }
@@ -796,8 +797,10 @@ public class IndexRegionObserver extends BaseRegionObserver {
private void doPre(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+ if (ignoreIndexRebuildForTesting && context.rebuild) {
+ return;
+ }
long start = EnvironmentEdgeManager.currentTimeMillis();
-
try {
if (failPreIndexUpdatesForTesting) {
throw new DoNotRetryIOException("Simulating the first (i.e., pre) index table write failure");
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
index 7e61b2d..36af59e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.phoenix.compile.*;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
-import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
@@ -42,6 +41,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolDataTableName;
import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolIndexTableName;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexVerifyType;
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
/**
@@ -92,6 +92,7 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph
try {
scan.setTimeRange(0, scn);
scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING, TRUE_BYTES);
+ scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE, getIndexVerifyType(configuration).toBytes());
} catch (IOException e) {
throw new SQLException(e);
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 9a589b9..dcabdfd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
@@ -71,6 +72,7 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.phoenix.compile.PostIndexDDLCompiler;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
@@ -86,12 +88,14 @@ import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
import org.apache.phoenix.parse.HintNode.Hint;
import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.ColumnInfo;
import org.apache.phoenix.util.EquiDepthStreamHistogram;
@@ -112,6 +116,59 @@ import com.google.common.collect.Lists;
*
*/
public class IndexTool extends Configured implements Tool {
+ public enum IndexVerifyType {
+ BEFORE("BEFORE"),
+ AFTER("AFTER"),
+ BOTH("BOTH"),
+ ONLY("ONLY"),
+ NONE("NONE");
+ private String value;
+ private byte[] valueBytes;
+
+ private IndexVerifyType(String value) {
+ this.value = value;
+ this.valueBytes = PVarchar.INSTANCE.toBytes(value);
+ }
+
+ public String getValue() {
+ return this.value;
+ }
+
+ public byte[] toBytes() {
+ return this.valueBytes;
+ }
+
+ public static IndexVerifyType fromValue(String value) {
+ for (IndexVerifyType verifyType: IndexVerifyType.values()) {
+ if (value.equals(verifyType.getValue())) {
+ return verifyType;
+ }
+ }
+ throw new IllegalStateException("Invalid value: "+ value + " for " + IndexVerifyType.class);
+ }
+
+ public static IndexVerifyType fromValue(byte[] value) {
+ return fromValue(Bytes.toString(value));
+ }
+ }
+
+ public final static String OUTPUT_TABLE_NAME = "PHOENIX_INDEX_TOOL";
+ public final static byte[] OUTPUT_TABLE_NAME_BYTES = Bytes.toBytes(OUTPUT_TABLE_NAME);
+ public final static byte[] OUTPUT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
+ public final static String DATA_TABLE_NAME = "DTName";
+ public final static byte[] DATA_TABLE_NAME_BYTES = Bytes.toBytes(DATA_TABLE_NAME);
+ public static String INDEX_TABLE_NAME = "ITName";
+ public final static byte[] INDEX_TABLE_NAME_BYTES = Bytes.toBytes(INDEX_TABLE_NAME);
+ public static String DATA_TABLE_ROW_KEY = "DTRowKey";
+ public final static byte[] DATA_TABLE_ROW_KEY_BYTES = Bytes.toBytes(DATA_TABLE_ROW_KEY);
+ public static String INDEX_TABLE_ROW_KEY = "ITRowKey";
+ public final static byte[] INDEX_TABLE_ROW_KEY_BYTES = Bytes.toBytes(INDEX_TABLE_ROW_KEY);
+ public static String DATA_TABLE_TS = "DTTS";
+ public final static byte[] DATA_TABLE_TS_BYTES = Bytes.toBytes(DATA_TABLE_TS);
+ public static String INDEX_TABLE_TS = "ITTS";
+ public final static byte[] INDEX_TABLE_TS_BYTES = Bytes.toBytes(INDEX_TABLE_TS);
+ public static String ERROR_MESSAGE = "Error";
+ public final static byte[] ERROR_MESSAGE_BYTES = Bytes.toBytes(ERROR_MESSAGE);
private static final Logger LOGGER = LoggerFactory.getLogger(IndexTool.class);
@@ -119,6 +176,7 @@ public class IndexTool extends Configured implements Tool {
private String dataTable;
private String indexTable;
private boolean isPartialBuild;
+ private IndexVerifyType indexVerifyType = IndexVerifyType.NONE;
private String qDataTable;
private String qIndexTable;
private boolean useSnapshot;
@@ -144,6 +202,17 @@ public class IndexTool extends Configured implements Tool {
private static final Option DIRECT_API_OPTION = new Option("direct", "direct", false,
"This parameter is deprecated. Direct mode will be used whether it is set or not. Keeping it for backwards compatibility.");
+ private static final Option VERIFY_OPTION = new Option("v", "verify", true,
+ "To verify every data row has a corresponding row. The accepted values are NONE, ONLY, BEFORE," +
+ " AFTER, and BOTH. NONE is for no inline verification, which is also the default for this option. " +
+ "ONLY is for verifying without rebuilding index rows. The rest for verifying before, after, and " +
+ "both before and after rebuilding row. If the verification is done before rebuilding rows and " +
+ "the correct index rows are not rebuilt. Currently supported values are NONE, ONLY and AFTER ");
+
+ private static final Option ONLY_VERIFY_OPTION = new Option("ov", "only-verify", false,
+ "To verify every data table row has the corresponding index row with the correct content " +
+ "(without building the index table). If the verify option is set then the only-verify option will be ignored");
+
private static final double DEFAULT_SPLIT_SAMPLING_RATE = 10.0;
private static final Option SPLIT_INDEX_OPTION =
@@ -189,6 +258,8 @@ public class IndexTool extends Configured implements Tool {
options.addOption(INDEX_TABLE_OPTION);
options.addOption(PARTIAL_REBUILD_OPTION);
options.addOption(DIRECT_API_OPTION);
+ options.addOption(VERIFY_OPTION);
+ options.addOption(ONLY_VERIFY_OPTION);
options.addOption(RUN_FOREGROUND_OPTION);
options.addOption(OUTPUT_PATH_OPTION);
options.addOption(SNAPSHOT_OPTION);
@@ -510,6 +581,7 @@ public class IndexTool extends Configured implements Tool {
PhoenixConfigurationUtil.setIndexToolDataTableName(configuration, qDataTable);
PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, qIndexTable);
+ PhoenixConfigurationUtil.setIndexVerifyType(configuration, indexVerifyType);
String physicalIndexTable = pIndexTable.getPhysicalName().getString();
PhoenixConfigurationUtil.setPhysicalTableName(configuration, physicalIndexTable);
@@ -569,6 +641,21 @@ public class IndexTool extends Configured implements Tool {
return job;
}
+ private void createIndexToolOutputTable(Connection connection) throws Exception {
+ ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
+ Admin admin = queryServices.getAdmin();
+ if (admin.tableExists(TableName.valueOf(OUTPUT_TABLE_NAME))) {
+ return;
+ }
+ HTableDescriptor tableDescriptor = new
+ HTableDescriptor(TableName.valueOf(OUTPUT_TABLE_NAME));
+ tableDescriptor.setValue("DISABLE_TABLE_SOR", "true");
+ tableDescriptor.setValue(HColumnDescriptor.TTL, String.valueOf(MetaDataProtocol.DEFAULT_LOG_TTL));
+ HColumnDescriptor columnDescriptor = new HColumnDescriptor(QueryConstants.DEFAULT_COLUMN_FAMILY);
+ tableDescriptor.addFamily(columnDescriptor);
+ admin.createTable(tableDescriptor);
+ }
+
@Override
public int run(String[] args) throws Exception {
Connection connection = null;
@@ -592,6 +679,14 @@ public class IndexTool extends Configured implements Tool {
dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
isPartialBuild = cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt());
+ if (cmdLine.hasOption(VERIFY_OPTION.getOpt())) {
+ String value = cmdLine.getOptionValue(VERIFY_OPTION.getOpt());
+ indexVerifyType = IndexVerifyType.fromValue(value);
+ if (!(indexVerifyType == IndexVerifyType.NONE || indexVerifyType == IndexVerifyType.AFTER ||
+ indexVerifyType == IndexVerifyType.ONLY)) {
+ throw new IllegalStateException("Unsupported value for the verify option");
+ }
+ }
qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
try(Connection tempConn = ConnectionUtil.getInputConnection(configuration)) {
pDataTable = PhoenixRuntime.getTableNoCache(tempConn, qDataTable);
@@ -606,7 +701,8 @@ public class IndexTool extends Configured implements Tool {
pIndexTable = null;
connection = ConnectionUtil.getInputConnection(configuration);
-
+ createIndexToolOutputTable(connection);
+
if (indexTable != null) {
if (!isValidIndexTable(connection, qDataTable,indexTable, tenantId)) {
throw new IllegalArgumentException(String.format(
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 39ce183..4c50e0e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -45,6 +45,7 @@ import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor;
import org.apache.phoenix.mapreduce.PhoenixInputFormat;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
+import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PTable;
@@ -146,6 +147,8 @@ public final class PhoenixConfigurationUtil {
public static final String DISABLED_INDEXES = "phoenix.mr.index.disabledIndexes";
+ public static final String INDEX_VERIFY_TYPE = "phoenix.mr.index.IndexVerifyType";
+
// Generate splits based on scans from stats, or just from region splits
public static final String MAPREDUCE_SPLIT_BY_STATS = "phoenix.mapreduce.split.by.stats";
@@ -561,6 +564,11 @@ public final class PhoenixConfigurationUtil {
configuration.set(DISABLED_INDEXES, indexName);
}
+ public static void setIndexVerifyType(Configuration configuration, IndexTool.IndexVerifyType verifyType) {
+ Preconditions.checkNotNull(configuration);
+ configuration.set(INDEX_VERIFY_TYPE, verifyType.getValue());
+ }
+
public static String getScrutinyDataTableName(Configuration configuration) {
Preconditions.checkNotNull(configuration);
return configuration.get(SCRUTINY_DATA_TABLE_NAME);
@@ -686,6 +694,12 @@ public final class PhoenixConfigurationUtil {
return configuration.get(DISABLED_INDEXES);
}
+ public static IndexTool.IndexVerifyType getIndexVerifyType(Configuration configuration) {
+ Preconditions.checkNotNull(configuration);
+ String value = configuration.get(INDEX_VERIFY_TYPE, IndexTool.IndexVerifyType.NONE.getValue());
+ return IndexTool.IndexVerifyType.fromValue(value);
+ }
+
public static boolean getSplitByStats(final Configuration configuration) {
Preconditions.checkNotNull(configuration);
boolean split = configuration.getBoolean(MAPREDUCE_SPLIT_BY_STATS, DEFAULT_SPLIT_BY_STATS);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 3dab605..e048467 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -358,7 +358,7 @@ public class QueryServicesOptions {
public static final long DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS = 7*24*60*60*1000; /* 7 days */
public static final boolean DEFAULT_INDEX_REGION_OBSERVER_ENABLED = true;
- public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 16*1024;
+ public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 32*1024;
public static final boolean DEFAULT_ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK = false;