You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2020/01/13 08:31:59 UTC

[phoenix] branch 4.x-HBase-1.3 updated (4a905b9 -> 7a525c1)

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a change to branch 4.x-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git.


    from 4a905b9  PHOENIX-5651: IndexScrutiny does not handle TTL/row-expiry
     new 4fff098  PHOENIX-5658 IndexTool to verify index rows inline
     new 662d726  PHOENIX-5658 IndexTool to verify index rows inline (addendum - removed unused the only-verify option)
     new 7a525c1  PHOENIX-5666 IndexRegionObserver incorrectly updates PostIndexUpdateFailure metric

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/phoenix/end2end/IndexToolIT.java    | 168 +++++-
 .../phoenix/compile/ServerBuildIndexCompiler.java  |   5 -
 .../coprocessor/BaseScannerRegionObserver.java     |   3 +
 .../coprocessor/IndexRebuildRegionScanner.java     | 629 +++++++++++++++++++++
 .../UngroupedAggregateRegionObserver.java          | 236 +-------
 .../phoenix/hbase/index/IndexRegionObserver.java   |  19 +-
 .../PhoenixServerBuildIndexInputFormat.java        |   3 +-
 .../apache/phoenix/mapreduce/index/IndexTool.java  |  93 ++-
 .../mapreduce/util/PhoenixConfigurationUtil.java   |  14 +
 .../apache/phoenix/query/QueryServicesOptions.java |   2 +-
 10 files changed, 922 insertions(+), 250 deletions(-)
 create mode 100644 phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java


[phoenix] 01/03: PHOENIX-5658 IndexTool to verify index rows inline

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.x-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit 4fff0982339cab14bfed238a1e32787652ec857d
Author: Kadir <ko...@salesforce.com>
AuthorDate: Sun Jan 12 17:55:11 2020 -0800

    PHOENIX-5658 IndexTool to verify index rows inline
---
 .../org/apache/phoenix/end2end/IndexToolIT.java    | 168 +++++-
 .../phoenix/compile/ServerBuildIndexCompiler.java  |   5 -
 .../coprocessor/BaseScannerRegionObserver.java     |   3 +
 .../coprocessor/IndexRebuildRegionScanner.java     | 629 +++++++++++++++++++++
 .../UngroupedAggregateRegionObserver.java          | 236 +-------
 .../phoenix/hbase/index/IndexRegionObserver.java   |  17 +-
 .../PhoenixServerBuildIndexInputFormat.java        |   3 +-
 .../apache/phoenix/mapreduce/index/IndexTool.java  |  98 +++-
 .../mapreduce/util/PhoenixConfigurationUtil.java   |  14 +
 .../apache/phoenix/query/QueryServicesOptions.java |   2 +-
 10 files changed, 926 insertions(+), 249 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 0cdfc39..2eeeb4a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -38,18 +38,26 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.end2end.index.GlobalIndexCheckerIT;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.index.IndexTool;
 import org.apache.phoenix.query.ConnectionQueryServices;
@@ -370,6 +378,139 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
         }
     }
 
+    private Cell getErrorMessageFromIndexToolOutputTable(Connection conn, String dataTableFullName, String indexTableFullName)
+            throws Exception {
+        byte[] indexTableFullNameBytes = Bytes.toBytes(indexTableFullName);
+        byte[] dataTableFullNameBytes = Bytes.toBytes(dataTableFullName);
+        Table hIndexTable = conn.unwrap(PhoenixConnection.class).getQueryServices()
+                .getTable(IndexTool.OUTPUT_TABLE_NAME_BYTES);
+        Scan scan = new Scan();
+        ResultScanner scanner = hIndexTable.getScanner(scan);
+        boolean dataTableNameCheck = false;
+        boolean indexTableNameCheck = false;
+        Cell errorMessageCell = null;
+        Result result = scanner.next();
+        if (result != null) {
+            for (Cell cell : result.rawCells()) {
+                assertTrue(Bytes.compareTo(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+                        IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, 0,
+                        IndexTool.OUTPUT_TABLE_COLUMN_FAMILY.length) == 0);
+                if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+                        IndexTool.DATA_TABLE_NAME_BYTES, 0, IndexTool.DATA_TABLE_NAME_BYTES.length) == 0) {
+                    dataTableNameCheck = true;
+                    assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+                            dataTableFullNameBytes, 0, dataTableFullNameBytes.length) == 0);
+                } else if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+                        IndexTool.INDEX_TABLE_NAME_BYTES, 0, IndexTool.INDEX_TABLE_NAME_BYTES.length) == 0) {
+                    indexTableNameCheck = true;
+                    assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+                            indexTableFullNameBytes, 0, indexTableFullNameBytes.length) == 0);
+                } else if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+                        IndexTool.ERROR_MESSAGE_BYTES, 0, IndexTool.ERROR_MESSAGE_BYTES.length) == 0) {
+                    errorMessageCell = cell;
+                }
+            }
+        }
+        assertTrue(dataTableNameCheck && indexTableNameCheck && errorMessageCell != null);
+        return errorMessageCell;
+    }
+
+    @Test
+    public void testIndexToolVerifyAfterOption() throws Exception {
+        // This test is for building non-transactional global indexes with direct api
+        if (localIndex || transactional || !directApi || useSnapshot) {
+            return;
+        }
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String schemaName = generateUniqueName();
+            String dataTableName = generateUniqueName();
+            String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+            String indexTableName = generateUniqueName();
+            String viewName = generateUniqueName();
+            String viewFullName = SchemaUtil.getTableName(schemaName, viewName);
+            conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+                    + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) "
+                    + tableDDLOptions);
+            conn.commit();
+            conn.createStatement().execute("CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName);
+            conn.commit();
+            // Insert a row
+            conn.createStatement().execute("upsert into " + viewFullName + " values (1, 'Phoenix', 12345)");
+            conn.commit();
+            // Configure IndexRegionObserver to fail the first write phase. This should not
+            // lead to any change on index and thus the index verify during index rebuild should fail
+            IndexRegionObserver.setIgnoreIndexRebuildForTesting(true);
+            conn.createStatement().execute(String.format(
+                    "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC", indexTableName, viewFullName));
+            // Run the index MR job and verify that the index table rebuild fails
+            runIndexTool(directApi, useSnapshot, schemaName, viewName, indexTableName,
+                    null, -1, IndexTool.IndexVerifyType.AFTER);
+            // The index tool output table should report that there is a missing index row
+            Cell cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, "_IDX_" + dataTableFullName);
+            byte[] expectedValueBytes = Bytes.toBytes("Missing index rows - Expected: 1 Actual: 0");
+            assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+                    expectedValueBytes, 0, expectedValueBytes.length) == 0);
+            IndexRegionObserver.setIgnoreIndexRebuildForTesting(false);
+            Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+            TableName indexToolOutputTable = TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES);
+            admin.disableTable(indexToolOutputTable);
+            admin.deleteTable(indexToolOutputTable);
+        }
+    }
+
+    @Test
+    public void testIndexToolOnlyVerifyOption() throws Exception {
+        // This test is for building non-transactional global indexes with direct api
+        if (localIndex || transactional || !directApi || useSnapshot) {
+            return;
+        }
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        String indexTableName = generateUniqueName();
+        String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.createStatement().execute("CREATE TABLE " + dataTableFullName
+                    + " (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, CODE VARCHAR) COLUMN_ENCODED_BYTES=0");
+            // Insert a row
+            conn.createStatement().execute("upsert into " + dataTableFullName + " values (1, 'Phoenix', 'A')");
+            conn.commit();
+            conn.createStatement().execute(String.format(
+                    "CREATE INDEX %s ON %s (NAME) INCLUDE (CODE) ASYNC", indexTableName, dataTableFullName));
+            // Run the index MR job to only verify that each data table row has a corresponding index row
+            // IndexTool will go through each data table row and record the mismatches in the output table
+            // called PHOENIX_INDEX_TOOL
+            runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
+                    null, 0, IndexTool.IndexVerifyType.ONLY);
+            Cell cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName);
+            byte[] expectedValueBytes = Bytes.toBytes("Missing index rows - Expected: 1 Actual: 0");
+            assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+                    expectedValueBytes, 0, expectedValueBytes.length) == 0);
+            // Delete the output table for the next test
+            Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+            TableName indexToolOutputTable = TableName.valueOf(IndexTool.OUTPUT_TABLE_NAME_BYTES);
+            admin.disableTable(indexToolOutputTable);
+            admin.deleteTable(indexToolOutputTable);
+            // Run the index tool to populate the index while verifying rows
+            runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
+                    null, 0, IndexTool.IndexVerifyType.AFTER);
+            // Corrupt one cell by writing directly into the index table
+            conn.createStatement().execute("upsert into " + indexTableFullName + " values ('Phoenix', 1, 'B')");
+            conn.commit();
+            // Run the index tool using the only-verify option to detect this mismatch between the data and index table
+            runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName,
+                    null, 0, IndexTool.IndexVerifyType.ONLY);
+            cell = getErrorMessageFromIndexToolOutputTable(conn, dataTableFullName, indexTableFullName);
+            expectedValueBytes = Bytes.toBytes("Not matching value for 0:0:CODE E:A A:B");
+            assertTrue(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+                    expectedValueBytes, 0, expectedValueBytes.length) == 0);
+            admin.disableTable(indexToolOutputTable);
+            admin.deleteTable(indexToolOutputTable);
+        }
+    }
+
     @Test
     public void testIndexToolWithTenantId() throws Exception {
         if (!useTenantId) { return;}
@@ -643,9 +784,10 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
             actualExplainPlan.contains(expectedExplainPlan));
     }
 
-    public static String[] getArgValues(boolean directApi, boolean useSnapshot, String schemaName,
-            String dataTable, String indxTable, String tenantId) {
-        final List<String> args = Lists.newArrayList();
+    private static List<String> getArgList (boolean directApi, boolean useSnapshot, String schemaName,
+                                            String dataTable, String indxTable, String tenantId,
+                                            IndexTool.IndexVerifyType verifyType) {
+        List<String> args = Lists.newArrayList();
         if (schemaName != null) {
             args.add("-s");
             args.add(schemaName);
@@ -657,6 +799,8 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
         if (directApi) {
             args.add("-direct");
         }
+        args.add("-v" + verifyType.getValue()); // verify index rows inline
+
         // Need to run this job in foreground for the test to be deterministic
         args.add("-runfg");
         if (useSnapshot) {
@@ -670,6 +814,12 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
 
         args.add("-op");
         args.add("/tmp/" + UUID.randomUUID().toString());
+        return args;
+    }
+
+    public static String[] getArgValues(boolean directApi, boolean useSnapshot, String schemaName,
+            String dataTable, String indexTable, String tenantId, IndexTool.IndexVerifyType verifyType) {
+        List<String> args = getArgList(directApi, useSnapshot, schemaName, dataTable, indexTable, tenantId, verifyType);
         return args.toArray(new String[0]);
     }
 
@@ -715,12 +865,20 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
     public static IndexTool runIndexTool(boolean directApi, boolean useSnapshot, String schemaName,
             String dataTableName, String indexTableName, String tenantId, int expectedStatus,
             String... additionalArgs) throws Exception {
+        return runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, tenantId, expectedStatus,
+                IndexTool.IndexVerifyType.NONE, additionalArgs);
+    }
+
+    public static IndexTool runIndexTool(boolean directApi, boolean useSnapshot, String schemaName,
+                                         String dataTableName, String indexTableName, String tenantId,
+                                         int expectedStatus, IndexTool.IndexVerifyType verifyType,
+                                         String... additionalArgs) throws Exception {
         IndexTool indexingTool = new IndexTool();
         Configuration conf = new Configuration(getUtility().getConfiguration());
         conf.set(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString());
         indexingTool.setConf(conf);
-        final String[] cmdArgs =
-                getArgValues(directApi, useSnapshot, schemaName, dataTableName, indexTableName, tenantId);
+        final String[] cmdArgs = getArgValues(directApi, useSnapshot, schemaName, dataTableName,
+                indexTableName, tenantId, verifyType);
         List<String> cmdArgList = new ArrayList<>(Arrays.asList(cmdArgs));
         cmdArgList.addAll(Arrays.asList(additionalArgs));
         int status = indexingTool.run(cmdArgList.toArray(new String[cmdArgList.size()]));
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
index 7d1c1b4..1b482aa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.compile;
 
 import java.sql.SQLException;
 import java.util.Collections;
-import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Scan;
@@ -36,11 +35,7 @@ import org.apache.phoenix.schema.*;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.ByteUtil;
-import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
-import org.apache.phoenix.util.StringUtil;
-
-import com.google.common.collect.Lists;
 
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 71e7546..6e0a1e4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -86,6 +86,9 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     public static final String LOCAL_INDEX_BUILD = "_LocalIndexBuild";
     // The number of index rows to be rebuild in one RPC call
     public static final String INDEX_REBUILD_PAGING = "_IndexRebuildPaging";
+    // Index verification type done by the index tool
+    public static final String INDEX_REBUILD_VERIFY_TYPE = "_IndexRebuildVerifyType";
+
     /* 
     * Attribute to denote that the index maintainer has been serialized using its proto-buf presentation.
     * Needed for backward compatibility purposes. TODO: get rid of this in next major release.
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
new file mode 100644
index 0000000..142871b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -0,0 +1,629 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.cache.ServerCacheClient;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
+import org.apache.phoenix.hbase.index.parallel.Task;
+import org.apache.phoenix.hbase.index.parallel.TaskBatch;
+import org.apache.phoenix.hbase.index.parallel.TaskRunner;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
+import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner;
+import org.apache.phoenix.hbase.index.table.HTableFactory;
+import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.GlobalIndexChecker;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.phoenix.hbase.index.IndexRegionObserver.VERIFIED_BYTES;
+import static org.apache.phoenix.hbase.index.IndexRegionObserver.removeEmptyColumn;
+import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY;
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
+
+public class IndexRebuildRegionScanner extends BaseRegionScanner {
+    private static final Logger LOGGER = LoggerFactory.getLogger(IndexRebuildRegionScanner.class);
+    public static final String NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY = "index.verify.threads.max";
+    private static final int DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS = 17;
+    public static final String INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY = "index.verify.threads.max";
+    private static final int DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK = 2048;
+    private long pageSizeInRows = Long.MAX_VALUE;
+    private int rowCountPerTask;
+    private boolean hasMore;
+    private final int maxBatchSize;
+    private UngroupedAggregateRegionObserver.MutationList mutations;
+    private final long maxBatchSizeBytes;
+    private final long blockingMemstoreSize;
+    private final byte[] clientVersionBytes;
+    private byte[] indexMetaData;
+    private boolean useProto = true;
+    private Scan scan;
+    private RegionScanner innerScanner;
+    private Region region;
+    private IndexMaintainer indexMaintainer;
+    private byte[] indexRowKey = null;
+    private Table indexHTable = null;
+    private Table outputHTable = null;
+    private IndexTool.IndexVerifyType verifyType = IndexTool.IndexVerifyType.NONE;
+    private boolean verify = false;
+    private boolean onlyVerify = false;
+    private Map<byte[], Put> indexKeyToDataPutMap;
+    private TaskRunner pool;
+    private TaskBatch<Boolean> tasks;
+    private String exceptionMessage;
+    private UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver;
+    private RegionCoprocessorEnvironment env;
+    private HTableFactory hTableFactory;
+
+    IndexRebuildRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan,
+                               final RegionCoprocessorEnvironment env,
+                               UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver) throws IOException {
+        super(innerScanner);
+        final Configuration config = env.getConfiguration();
+        if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) != null) {
+            pageSizeInRows = config.getLong(INDEX_REBUILD_PAGE_SIZE_IN_ROWS,
+                    QueryServicesOptions.DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS);
+        }
+        maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+        mutations = new UngroupedAggregateRegionObserver.MutationList(maxBatchSize);
+        maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
+                QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
+        blockingMemstoreSize = UngroupedAggregateRegionObserver.getBlockingMemstoreSize(region, config);
+        clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
+        indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
+        if (indexMetaData == null) {
+            useProto = false;
+            indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+        }
+        if (!scan.isRaw()) {
+            List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(indexMetaData, true);
+            indexMaintainer = maintainers.get(0);
+        }
+        this.scan = scan;
+        this.innerScanner = innerScanner;
+        this.region = region;
+        this.env = env;
+        this.ungroupedAggregateRegionObserver = ungroupedAggregateRegionObserver;
+        indexRowKey = scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY);
+        byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE);
+        if (valueBytes != null) {
+            verifyType = IndexTool.IndexVerifyType.fromValue(valueBytes);
+            if (verifyType == IndexTool.IndexVerifyType.AFTER || verifyType == IndexTool.IndexVerifyType.ONLY) {
+                verify = true;
+                if (verifyType == IndexTool.IndexVerifyType.ONLY) {
+                    onlyVerify = true;
+                }
+                hTableFactory = ServerUtil.getDelegateHTableFactory(env, ServerUtil.ConnectionType.INDEX_WRITER_CONNECTION);
+                indexHTable = hTableFactory.getTable(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+                outputHTable = hTableFactory.getTable(new ImmutableBytesPtr(IndexTool.OUTPUT_TABLE_NAME_BYTES));
+                indexKeyToDataPutMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+                pool = new WaitForCompletionTaskRunner(ThreadPoolManager.getExecutor(
+                        new ThreadPoolBuilder("IndexVerify",
+                                env.getConfiguration()).setMaxThread(NUM_CONCURRENT_INDEX_VERIFY_THREADS_CONF_KEY,
+                                DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS).setCoreTimeout(
+                                INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env));
+                tasks = new TaskBatch<>(DEFAULT_CONCURRENT_INDEX_VERIFY_THREADS);
+                rowCountPerTask = config.getInt(INDEX_VERIFY_ROW_COUNTS_PER_TASK_CONF_KEY,
+                        DEFAULT_INDEX_VERIFY_ROW_COUNTS_PER_TASK);
+            }
+        }
+    }
+
+    @Override
+    public HRegionInfo getRegionInfo() {
+        return region.getRegionInfo();
+    }
+
+    @Override
+    public boolean isFilterDone() { return hasMore; }
+
+    @Override
+    public void close() throws IOException {
+        innerScanner.close();
+        if (verify) {
+            this.pool.stop("IndexRebuildRegionScanner is closing");
+            indexHTable.close();
+            outputHTable.close();
+        }
+    }
+
+    private void setMutationAttributes(Mutation m, byte[] uuidValue) {
+        m.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
+        m.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+        m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
+                BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
+        m.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
+        // Since we're replaying existing mutations, it makes no sense to write them to the wal
+        m.setDurability(Durability.SKIP_WAL);
+    }
+
+    private Delete generateDeleteMarkers(List<Cell> row) {
+        Set<ColumnReference> allColumns = indexMaintainer.getAllColumns();
+        if (row.size() == allColumns.size() + 1) {
+            // We have all the columns for the index table plus the empty column. So, no delete marker is needed
+            return null;
+        }
+        Set<ColumnReference> includedColumns = Sets.newLinkedHashSetWithExpectedSize(row.size());
+        long ts = 0;
+        for (Cell cell : row) {
+            includedColumns.add(new ColumnReference(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell)));
+            if (ts < cell.getTimestamp()) {
+                ts = cell.getTimestamp();
+            }
+        }
+        byte[] rowKey;
+        Delete del = null;
+        for (ColumnReference column : allColumns) {
+            if (!includedColumns.contains(column)) {
+                if (del == null) {
+                    Cell cell = row.get(0);
+                    rowKey = CellUtil.cloneRow(cell);
+                    del = new Delete(rowKey);
+                }
+                del.addColumns(column.getFamily(), column.getQualifier(), ts);
+            }
+        }
+        return del;
+    }
+
+    private void addToBeVerifiedIndexRows() throws IOException {
+        for (Mutation mutation : mutations) {
+            if (mutation instanceof Put) {
+                indexKeyToDataPutMap.put(getIndexRowKey((Put)mutation), (Put)mutation);
+            }
+        }
+    }
+
+    private byte[] commitIfReady(byte[] uuidValue) throws IOException {
+        if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
+            ungroupedAggregateRegionObserver.checkForRegionClosing();
+            ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutations, blockingMemstoreSize);
+            uuidValue = ServerCacheClient.generateId();
+            if (verify) {
+                addToBeVerifiedIndexRows();
+            }
+            mutations.clear();
+        }
+        return uuidValue;
+    }
+
+    private class SimpleValueGetter implements ValueGetter {
+        final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
+        final Put put;
+        SimpleValueGetter (final Put put) {
+            this.put = put;
+        }
+        @Override
+        public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
+            List<Cell> cellList = put.get(ref.getFamily(), ref.getQualifier());
+            if (cellList == null || cellList.isEmpty()) {
+                return null;
+            }
+            Cell cell = cellList.get(0);
+            valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+            return valuePtr;
+        }
+
+        @Override
+        public byte[] getRowKey() {
+            return put.getRow();
+        }
+
+    }
+
+    private byte[] getIndexRowKey(final Put dataRow) throws IOException {
+        ValueGetter valueGetter = new SimpleValueGetter(dataRow);
+        byte[] builtIndexRowKey = indexMaintainer.buildRowKey(valueGetter, new ImmutableBytesWritable(dataRow.getRow()),
+                null, null, HConstants.LATEST_TIMESTAMP);
+        return builtIndexRowKey;
+    }
+
+    private boolean checkIndexRow(final byte[] indexRowKey, final Put put) throws IOException {
+        byte[] builtIndexRowKey = getIndexRowKey(put);
+        if (Bytes.compareTo(builtIndexRowKey, 0, builtIndexRowKey.length,
+                indexRowKey, 0, indexRowKey.length) != 0) {
+            return false;
+        }
+        return true;
+    }
+
+    private void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
+                                           String errorMsg) {
+        logToIndexToolOutputTable(dataRowKey, indexRowKey, dataRowTs, indexRowTs,
+                errorMsg, null, null);
+
+    }
+
+    private void logToIndexToolOutputTable(byte[] dataRowKey, byte[] indexRowKey, long dataRowTs, long indexRowTs,
+                                           String errorMsg, byte[] expectedValue,  byte[] actualValue) {
+        final byte[] E_VALUE_PREFIX_BYTES = Bytes.toBytes(" E:");
+        final byte[] A_VALUE_PREFIX_BYTES = Bytes.toBytes(" A:");
+        final int PREFIX_LENGTH = 3;
+        final int TOTAL_PREFIX_LENGTH = 6;
+
+        try {
+            int longLength = Long.SIZE / Byte.SIZE;
+            byte[] rowKey;
+            if (dataRowKey != null) {
+                rowKey = new byte[longLength + dataRowKey.length];
+                Bytes.putLong(rowKey, 0, scan.getTimeRange().getMax());
+                Bytes.putBytes(rowKey, longLength, dataRowKey, 0, dataRowKey.length);
+            } else {
+                rowKey = new byte[longLength];
+                Bytes.putLong(rowKey, 0, scan.getTimeRange().getMax());
+            }
+            Put put = new Put(rowKey);
+            long scanMaxTs = scan.getTimeRange().getMax();
+            put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_NAME_BYTES,
+                    scanMaxTs, region.getRegionInfo().getTable().getName());
+            put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_NAME_BYTES,
+                    scanMaxTs, indexMaintainer.getIndexTableName());
+            if (dataRowKey != null) {
+                put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.DATA_TABLE_TS_BYTES,
+                        scanMaxTs, Bytes.toBytes(dataRowTs));
+            }
+            if (indexRowKey != null) {
+                put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_ROW_KEY_BYTES,
+                        scanMaxTs, indexRowKey);
+                put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.INDEX_TABLE_TS_BYTES,
+                        scanMaxTs, Bytes.toBytes(indexRowTs));
+            }
+            byte[] errorMessageBytes;
+            if (expectedValue != null) {
+                errorMessageBytes = new byte[errorMsg.length() + expectedValue.length + actualValue.length +
+                        TOTAL_PREFIX_LENGTH];
+                Bytes.putBytes(errorMessageBytes, 0, Bytes.toBytes(errorMsg), 0, errorMsg.length());
+                int length = errorMsg.length();
+                Bytes.putBytes(errorMessageBytes, length, E_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH);
+                length += PREFIX_LENGTH;
+                Bytes.putBytes(errorMessageBytes, length, expectedValue, 0, expectedValue.length);
+                length += expectedValue.length;
+                Bytes.putBytes(errorMessageBytes, length, A_VALUE_PREFIX_BYTES, 0, PREFIX_LENGTH);
+                length += PREFIX_LENGTH;
+                Bytes.putBytes(errorMessageBytes, length, actualValue, 0, actualValue.length);
+
+            }
+            else {
+                errorMessageBytes = Bytes.toBytes(errorMsg);
+            }
+            put.addColumn(IndexTool.OUTPUT_TABLE_COLUMN_FAMILY, IndexTool.ERROR_MESSAGE_BYTES, scanMaxTs, errorMessageBytes);
+            outputHTable.put(put);
+        } catch (IOException e) {
+            exceptionMessage = "LogToIndexToolOutputTable failed " + e;
+        }
+    }
+
+    private long getMaxTimestamp(Result result) {
+        long ts = 0;
+        for (Cell cell : result.rawCells()) {
+            if (ts < cell.getTimestamp()) {
+                ts = cell.getTimestamp();
+            }
+        }
+        return ts;
+    }
+
+    private void verifySingleIndexRow(Result indexRow, final Put dataRow) throws IOException {
+        ValueGetter valueGetter = new SimpleValueGetter(dataRow);
+        long ts = 0;
+        for (List<Cell> cells : dataRow.getFamilyCellMap().values()) {
+            if (cells == null) {
+                break;
+            }
+            for (Cell cell : cells) {
+                if (ts < cell.getTimestamp()) {
+                    ts = cell.getTimestamp();
+                }
+            }
+        }
+        Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+                valueGetter, new ImmutableBytesWritable(dataRow.getRow()), ts, null, null);
+        if (indexPut == null) {
+            // This means the index row does not have any covered columns. We just need to check if the index row
+            // has only one cell (which is the empty column cell)
+            if (indexRow.rawCells().length == 1) {
+                return;
+            }
+            String errorMsg = "Expected to find only empty column cell but got "
+                    + indexRow.rawCells().length;
+            logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
+            if (onlyVerify) {
+                return;
+            }
+            exceptionMessage = "Index verify failed - " + errorMsg + indexHTable.getName();
+            throw new IOException(exceptionMessage);
+        }
+        else {
+            // Remove the empty column prepared by Index codec as we need to change its value
+            removeEmptyColumn(indexPut, indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                    indexMaintainer.getEmptyKeyValueQualifier());
+        }
+        // Add the empty column
+        indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                indexMaintainer.getEmptyKeyValueQualifier(), ts, VERIFIED_BYTES);
+        int cellCount = 0;
+        for (List<Cell> cells : indexPut.getFamilyCellMap().values()) {
+            if (cells == null) {
+                break;
+            }
+            for (Cell expectedCell : cells) {
+                byte[] family = CellUtil.cloneFamily(expectedCell);
+                byte[] qualifier = CellUtil.cloneQualifier(expectedCell);
+                Cell actualCell = indexRow.getColumnLatestCell(family, qualifier);
+                if (actualCell == null) {
+                    String errorMsg = " Missing cell " + Bytes.toString(family) + ":" +
+                            Bytes.toString(qualifier);
+                    logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
+                    if (onlyVerify) {
+                        return;
+                    }
+                    exceptionMessage = "Index verify failed - Missing cell " + indexHTable.getName();
+                    throw new IOException(exceptionMessage);
+                }
+                // Check all columns
+                if (!CellUtil.matchingValue(actualCell, expectedCell)) {
+                    String errorMsg = "Not matching value for " + Bytes.toString(family) + ":" +
+                            Bytes.toString(qualifier);
+                    logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow),
+                            errorMsg, CellUtil.cloneValue(expectedCell), CellUtil.cloneValue(actualCell));
+                    if (onlyVerify) {
+                        return;
+                    }
+                    exceptionMessage = "Index verify failed - Not matching cell value - " + indexHTable.getName();
+                    throw new IOException(exceptionMessage);
+                }
+                cellCount++;
+            }
+        }
+        if (cellCount != indexRow.rawCells().length) {
+            String errorMsg = "Expected to find " + cellCount + " cells but got "
+                    + indexRow.rawCells().length + " cells";
+            logToIndexToolOutputTable(dataRow.getRow(), indexRow.getRow(), ts, getMaxTimestamp(indexRow), errorMsg);
+            if (!onlyVerify) {
+                exceptionMessage = "Index verify failed - " + errorMsg + " - " + indexHTable.getName();
+                throw new IOException(exceptionMessage);
+            }
+        }
+    }
+
+    private void verifyIndexRows(ArrayList<KeyRange> keys) throws IOException {
+        int expectedRowCount = keys.size();
+        ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
+        Scan indexScan = new Scan();
+        indexScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax());
+        scanRanges.initializeScan(indexScan);
+        SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
+        indexScan.setFilter(skipScanFilter);
+        int rowCount = 0;
+        try (ResultScanner resultScanner = indexHTable.getScanner(indexScan)) {
+            for (Result result = resultScanner.next(); (result != null); result = resultScanner.next()) {
+                Put dataPut = indexKeyToDataPutMap.get(result.getRow());
+                if (dataPut == null) {
+                    exceptionMessage = "Index verify failed - Missing data row - " + indexHTable.getName();
+                    String errorMsg = "Missing data row";
+                    logToIndexToolOutputTable(null, result.getRow(), 0, getMaxTimestamp(result), errorMsg);
+                    if (!onlyVerify) {
+                        throw new IOException(exceptionMessage);
+                    }
+                }
+                verifySingleIndexRow(result, dataPut);
+                rowCount++;
+            }
+        } catch (Throwable t) {
+            ServerUtil.throwIOException(indexHTable.getName().toString(), t);
+        }
+        if (rowCount != expectedRowCount) {
+            String errorMsg = "Missing index rows - Expected: " + expectedRowCount +
+                    " Actual: " + rowCount;
+                    exceptionMessage = "Index verify failed - " + errorMsg + " - " + indexHTable.getName();
+            logToIndexToolOutputTable(null, null, 0, 0, errorMsg);
+            if (!onlyVerify) {
+                throw new IOException(exceptionMessage);
+            }
+        }
+    }
+
+    private void addVerifyTask(final ArrayList<KeyRange> keys) {
+        tasks.add(new Task<Boolean>() {
+            @Override
+            public Boolean call() throws Exception {
+                try {
+                    if (Thread.currentThread().isInterrupted()) {
+                        exceptionMessage = "Pool closed, not attempting to verify index rows! " + indexHTable.getName();
+                        throw new IOException(exceptionMessage);
+                    }
+                    verifyIndexRows(keys);
+                } catch (Exception e) {
+                    throw e;
+                }
+                return Boolean.TRUE;
+            }
+        });
+    }
+
+    @Override
+    public boolean next(List<Cell> results) throws IOException {
+        int rowCount = 0;
+        region.startRegionOperation();
+        try {
+            byte[] uuidValue = ServerCacheClient.generateId();
+            synchronized (innerScanner) {
+                do {
+                    List<Cell> row = new ArrayList<Cell>();
+                    hasMore = innerScanner.nextRaw(row);
+                    if (!row.isEmpty()) {
+                        Put put = null;
+                        Delete del = null;
+                        for (Cell cell : row) {
+                            if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+                                if (put == null) {
+                                    put = new Put(CellUtil.cloneRow(cell));
+                                    setMutationAttributes(put, uuidValue);
+                                    mutations.add(put);
+                                }
+                                put.add(cell);
+                            } else {
+                                if (del == null) {
+                                    del = new Delete(CellUtil.cloneRow(cell));
+                                    setMutationAttributes(del, uuidValue);
+                                    mutations.add(del);
+                                }
+                                del.addDeleteMarker(cell);
+                            }
+                        }
+                        if (onlyVerify) {
+                            rowCount++;
+                            continue;
+                        }
+                        uuidValue = commitIfReady(uuidValue);
+                        if (!scan.isRaw()) {
+                            Delete deleteMarkers = generateDeleteMarkers(row);
+                            if (deleteMarkers != null) {
+                                setMutationAttributes(deleteMarkers, uuidValue);
+                                mutations.add(deleteMarkers);
+                                uuidValue = commitIfReady(uuidValue);
+                            }
+                        }
+                        if (indexRowKey != null) {
+                            // GlobalIndexChecker passed the index row key. This is to build a single index row.
+                            // Check if the data table row we have just scanned matches with the index row key.
+                            // If not, there is no need to build the index row from this data table row,
+                            // and just return zero row count.
+                            if (checkIndexRow(indexRowKey, put)) {
+                                rowCount = GlobalIndexChecker.RebuildReturnCode.INDEX_ROW_EXISTS.getValue();
+                            }
+                            else {
+                                rowCount = GlobalIndexChecker.RebuildReturnCode.NO_INDEX_ROW.getValue();
+                            }
+                            break;
+                        }
+                        rowCount++;
+                    }
+
+                } while (hasMore && rowCount < pageSizeInRows);
+                if (!mutations.isEmpty() && !onlyVerify) {
+                    ungroupedAggregateRegionObserver.checkForRegionClosing();
+                    ungroupedAggregateRegionObserver.commitBatchWithRetries(region, mutations, blockingMemstoreSize);
+                    if (verify) {
+                        addToBeVerifiedIndexRows();
+                    }
+                }
+            }
+        } catch (IOException e) {
+            hasMore = false;
+            LOGGER.error("IOException during rebuilding: " + Throwables.getStackTraceAsString(e));
+            throw e;
+        } finally {
+            region.closeRegionOperation();
+        }
+        if (verify) {
+            if (onlyVerify) {
+                addToBeVerifiedIndexRows();
+            }
+            ArrayList<KeyRange> keys = new ArrayList<>(rowCountPerTask);
+            for (byte[] key : indexKeyToDataPutMap.keySet()) {
+                keys.add(PVarbinary.INSTANCE.getKeyRange(key));
+                if (keys.size() == rowCountPerTask) {
+                    addVerifyTask(keys);
+                    keys = new ArrayList<>(rowCountPerTask);
+                }
+            }
+            if (keys.size() > 0) {
+                addVerifyTask(keys);
+            }
+            List<Boolean> taskResultList = null;
+            try {
+                LOGGER.debug("Waiting on index verify tasks to complete...");
+                taskResultList = this.pool.submitUninterruptible(tasks);
+            } catch (ExecutionException e) {
+                throw new RuntimeException("Should not fail on the results while using a WaitForCompletionTaskRunner", e);
+            } catch (EarlyExitFailure e) {
+                throw new RuntimeException("Stopped while waiting for batch, quitting!", e);
+            }
+            finally {
+                indexKeyToDataPutMap.clear();
+                tasks.getTasks().clear();
+            }
+            for (Boolean result : taskResultList) {
+                if (result == null) {
+                    // there was a failure
+                    throw new IOException(exceptionMessage);
+                }
+            }
+        }
+        byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
+        final Cell aggKeyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
+                SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
+        results.add(aggKeyValue);
+        return hasMore;
+    }
+
+    @Override
+    public long getMaxResultSize() {
+        return scan.getMaxResultSize();
+    }
+}
\ No newline at end of file
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 278d4e7..6d25697 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -21,7 +21,6 @@ import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
-import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS;
 import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
 import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
@@ -38,7 +37,6 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
 
@@ -46,7 +44,6 @@ import javax.annotation.concurrent.GuardedBy;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -57,7 +54,6 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
@@ -81,7 +77,6 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.cache.GlobalCache;
-import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.cache.TenantCache;
 import org.apache.phoenix.coprocessor.generated.PTableProtos;
 import org.apache.phoenix.exception.DataExceedsCapacityException;
@@ -98,7 +93,6 @@ import org.apache.phoenix.hbase.index.exception.IndexWriteException;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
-import org.apache.phoenix.index.GlobalIndexChecker;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
@@ -244,7 +238,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         indexWriteProps = new ReadOnlyProps(indexWriteConfig.iterator());
     }
 
-    private void commitBatchWithRetries(final Region region, final List<Mutation> localRegionMutations, final long blockingMemstoreSize) throws IOException {
+    public void commitBatchWithRetries(final Region region, final List<Mutation> localRegionMutations, final long blockingMemstoreSize) throws IOException {
         try {
             commitBatch(region, localRegionMutations, blockingMemstoreSize);
         } catch (IOException e) {
@@ -272,7 +266,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
       // flush happen which decrease the memstore size and then writes allowed on the region.
       for (int i = 0; blockingMemstoreSize > 0 && region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) {
           try {
-              checkForRegionClosingOrSplitting();
+              checkForRegionClosing();
               Thread.sleep(100);
           } catch (InterruptedException e) {
               Thread.currentThread().interrupt();
@@ -321,7 +315,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
      * a high chance that flush might not proceed and memstore won't be freed up.
      * @throws IOException
      */
-    private void checkForRegionClosingOrSplitting() throws IOException {
+    public void checkForRegionClosing() throws IOException {
         synchronized (lock) {
             if(isRegionClosingOrSplitting) {
                 lock.notifyAll();
@@ -379,7 +373,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         }
     }
 
-   private long getBlockingMemstoreSize(Region region, Configuration conf) {
+    public static long getBlockingMemstoreSize(Region region, Configuration conf) {
        long flushSize = region.getTableDesc().getMemStoreFlushSize();
 
        if (flushSize <= 0) {
@@ -407,7 +401,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                     gp_width_bytes, gp_per_region_bytes);
             return collectStats(s, statsCollector, region, scan, env.getConfiguration());
         } else if (ScanUtil.isIndexRebuild(scan)) {
-            return rebuildIndices(s, region, scan, env.getConfiguration());
+            return rebuildIndices(s, region, scan, env);
         }
 
         PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
@@ -1055,226 +1049,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         }
     }
 
-    private class IndexRebuildRegionScanner extends BaseRegionScanner {
-        private long pageSizeInRows = Long.MAX_VALUE;
-        private boolean hasMore;
-        private final int maxBatchSize;
-        private MutationList mutations;
-        private final long maxBatchSizeBytes;
-        private final long blockingMemstoreSize;
-        private final byte[] clientVersionBytes;
-        private List<Cell> results = new ArrayList<Cell>();
-        private byte[] indexMetaData;
-        private boolean useProto = true;
-        private Scan scan;
-        private RegionScanner innerScanner;
-        private Region region;
-        private IndexMaintainer indexMaintainer;
-        private byte[] indexRowKey = null;
-
-        IndexRebuildRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan,
-                                   final Configuration config) {
-            super(innerScanner);
-            if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) != null) {
-                pageSizeInRows = config.getLong(INDEX_REBUILD_PAGE_SIZE_IN_ROWS,
-                        QueryServicesOptions.DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS);
-            }
-
-            maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
-            mutations = new MutationList(maxBatchSize);
-            maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
-                    QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
-            blockingMemstoreSize = getBlockingMemstoreSize(region, config);
-            clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
-            indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
-            if (indexMetaData == null) {
-                useProto = false;
-                indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
-            }
-            if (!scan.isRaw()) {
-                List<IndexMaintainer> maintainers = IndexMaintainer.deserialize(indexMetaData, true);
-                indexMaintainer = maintainers.get(0);
-            }
-            this.scan = scan;
-            this.innerScanner = innerScanner;
-            this.region = region;
-            indexRowKey = scan.getAttribute(BaseScannerRegionObserver.INDEX_ROW_KEY);
-        }
-
-        @Override
-        public HRegionInfo getRegionInfo() {
-            return region.getRegionInfo();
-        }
-
-        @Override
-        public boolean isFilterDone() { return hasMore; }
-
-        @Override
-        public void close() throws IOException { innerScanner.close(); }
-
-        private void setMutationAttributes(Mutation m, byte[] uuidValue) {
-            m.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
-            m.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-            m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
-                    BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
-            m.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
-            // Since we're replaying existing mutations, it makes no sense to write them to the wal
-            m.setDurability(Durability.SKIP_WAL);
-        }
-
-        private Delete generateDeleteMarkers(List<Cell> row) {
-            Set<ColumnReference> allColumns = indexMaintainer.getAllColumns();
-            if (row.size() == allColumns.size() + 1) {
-                // We have all the columns for the index table plus the empty column. So, no delete marker is needed
-                return null;
-            }
-            Set<ColumnReference> includedColumns = Sets.newLinkedHashSetWithExpectedSize(row.size());
-            long ts = 0;
-            for (Cell cell : row) {
-                includedColumns.add(new ColumnReference(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell)));
-                if (ts < cell.getTimestamp()) {
-                    ts = cell.getTimestamp();
-                }
-            }
-            byte[] rowKey;
-            Delete del = null;
-            for (ColumnReference column : allColumns) {
-                if (!includedColumns.contains(column)) {
-                    if (del == null) {
-                        Cell cell = row.get(0);
-                        rowKey = CellUtil.cloneRow(cell);
-                        del = new Delete(rowKey);
-                    }
-                    del.addColumns(column.getFamily(), column.getQualifier(), ts);
-                }
-            }
-            return del;
-        }
-
-        private byte[] commitIfReady(byte[] uuidValue) throws IOException {
-            if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
-                checkForRegionClosingOrSplitting();
-                commitBatchWithRetries(region, mutations, blockingMemstoreSize);
-                uuidValue = ServerCacheClient.generateId();
-                mutations.clear();
-            }
-            return uuidValue;
-        }
-
-        private boolean checkIndexRow(final byte[] indexRowKey, final Put put) throws IOException {
-            ValueGetter getter = new ValueGetter() {
-                final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable();
-
-                @Override
-                public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException {
-                    List<Cell> cellList = put.get(ref.getFamily(), ref.getQualifier());
-                    if (cellList == null || cellList.isEmpty()) {
-                        return null;
-                    }
-                    Cell cell = cellList.get(0);
-                    valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
-                    return valuePtr;
-                }
-
-                @Override
-                public byte[] getRowKey() {
-                    return put.getRow();
-                }
-            };
-            byte[] builtIndexRowKey = indexMaintainer.buildRowKey(getter, new ImmutableBytesWritable(put.getRow()),
-                    null, null, HConstants.LATEST_TIMESTAMP);
-            if (Bytes.compareTo(builtIndexRowKey, 0, builtIndexRowKey.length,
-                    indexRowKey, 0, indexRowKey.length) != 0) {
-                return false;
-            }
-            return true;
-        }
-
-        @Override
-        public boolean next(List<Cell> results) throws IOException {
-            int rowCount = 0;
-            region.startRegionOperation();
-            try {
-                byte[] uuidValue = ServerCacheClient.generateId();
-                synchronized (innerScanner) {
-                    do {
-                        List<Cell> row = new ArrayList<Cell>();
-                        hasMore = innerScanner.nextRaw(row);
-                        if (!row.isEmpty()) {
-                            Put put = null;
-                            Delete del = null;
-                            for (Cell cell : row) {
-                                if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
-                                    if (put == null) {
-                                        put = new Put(CellUtil.cloneRow(cell));
-                                        setMutationAttributes(put, uuidValue);
-                                        mutations.add(put);
-                                    }
-                                    put.add(cell);
-                                } else {
-                                    if (del == null) {
-                                        del = new Delete(CellUtil.cloneRow(cell));
-                                        setMutationAttributes(del, uuidValue);
-                                        mutations.add(del);
-                                    }
-                                    del.addDeleteMarker(cell);
-                                }
-                            }
-                            uuidValue = commitIfReady(uuidValue);
-                            if (!scan.isRaw()) {
-                                Delete deleteMarkers = generateDeleteMarkers(row);
-                                if (deleteMarkers != null) {
-                                    setMutationAttributes(deleteMarkers, uuidValue);
-                                    mutations.add(deleteMarkers);
-                                    uuidValue = commitIfReady(uuidValue);
-                                }
-                            }
-                            if (indexRowKey != null) {
-                                // GlobalIndexChecker passed the index row key. This is to build a single index row.
-                                // Check if the data table row we have just scanned matches with the index row key.
-                                // If not, there is no need to build the index row from this data table row,
-                                // and just return zero row count.
-                                if (checkIndexRow(indexRowKey, put)) {
-                                    rowCount = GlobalIndexChecker.RebuildReturnCode.INDEX_ROW_EXISTS.getValue();
-                                }
-                                else {
-                                    rowCount = GlobalIndexChecker.RebuildReturnCode.NO_INDEX_ROW.getValue();
-                                }
-                                break;
-                            }
-                            rowCount++;
-                        }
-
-                    } while (hasMore && rowCount < pageSizeInRows);
-                    if (!mutations.isEmpty()) {
-                        checkForRegionClosingOrSplitting();
-                        commitBatchWithRetries(region, mutations, blockingMemstoreSize);
-                    }
-                }
-            } catch (IOException e) {
-                hasMore = false;
-                LOGGER.error("IOException during rebuilding: " + Throwables.getStackTraceAsString(e));
-                throw e;
-            } finally {
-                region.closeRegionOperation();
-            }
-            byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
-            final KeyValue aggKeyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
-                    SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
-            results.add(aggKeyValue);
-            return hasMore;
-        }
-
-        @Override
-        public long getMaxResultSize() {
-            return scan.getMaxResultSize();
-        }
-    }
-
     private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan,
-                                         final Configuration config) throws IOException {
+                                         final RegionCoprocessorEnvironment env) throws IOException {
 
-        RegionScanner scanner = new IndexRebuildRegionScanner(innerScanner, region, scan, config);
+        RegionScanner scanner = new IndexRebuildRegionScanner(innerScanner, region, scan, env, this);
         return scanner;
     }
     
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 1e8649e..6e12950 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -125,13 +125,14 @@ public class IndexRegionObserver extends BaseRegionObserver {
       }
   }
 
-    private static boolean failPreIndexUpdatesForTesting = false;
-    private static boolean failPostIndexUpdatesForTesting = false;
-    private static boolean failDataTableUpdatesForTesting = false;
+  private static boolean ignoreIndexRebuildForTesting  = false;
+  private static boolean failPreIndexUpdatesForTesting = false;
+  private static boolean failPostIndexUpdatesForTesting = false;
+  private static boolean failDataTableUpdatesForTesting = false;
 
-  public static void setFailPreIndexUpdatesForTesting(boolean fail) {
-        failPreIndexUpdatesForTesting = fail;
-    }
+  public static void setIgnoreIndexRebuildForTesting(boolean ignore) { ignoreIndexRebuildForTesting = ignore; }
+
+  public static void setFailPreIndexUpdatesForTesting(boolean fail) { failPreIndexUpdatesForTesting = fail; }
 
   public static void setFailPostIndexUpdatesForTesting(boolean fail) { failPostIndexUpdatesForTesting = fail; }
 
@@ -796,8 +797,10 @@ public class IndexRegionObserver extends BaseRegionObserver {
 
   private void doPre(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context,
                      MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+      if (ignoreIndexRebuildForTesting && context.rebuild) {
+          return;
+      }
       long start = EnvironmentEdgeManager.currentTimeMillis();
-
       try {
           if (failPreIndexUpdatesForTesting) {
               throw new DoNotRetryIOException("Simulating the first (i.e., pre) index table write failure");
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
index 7e61b2d..36af59e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
 import org.apache.phoenix.compile.*;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
-import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
@@ -42,6 +41,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Preconditions;
 import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolDataTableName;
 import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolIndexTableName;
+import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexVerifyType;
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 
 /**
@@ -92,6 +92,7 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph
             try {
                 scan.setTimeRange(0, scn);
                 scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING, TRUE_BYTES);
+                scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE, getIndexVerifyType(configuration).toBytes());
             } catch (IOException e) {
                 throw new SQLException(e);
             }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 9a589b9..dcabdfd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
@@ -71,6 +72,7 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.phoenix.compile.PostIndexDDLCompiler;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
@@ -86,12 +88,14 @@ import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
 import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.EquiDepthStreamHistogram;
@@ -112,6 +116,59 @@ import com.google.common.collect.Lists;
  *
  */
 public class IndexTool extends Configured implements Tool {
+    public enum IndexVerifyType {
+        BEFORE("BEFORE"),
+        AFTER("AFTER"),
+        BOTH("BOTH"),
+        ONLY("ONLY"),
+        NONE("NONE");
+        private String value;
+        private byte[] valueBytes;
+
+        private IndexVerifyType(String value) {
+            this.value = value;
+            this.valueBytes = PVarchar.INSTANCE.toBytes(value);
+        }
+
+        public String getValue() {
+            return this.value;
+        }
+
+        public byte[] toBytes() {
+            return this.valueBytes;
+        }
+
+        public static IndexVerifyType fromValue(String value) {
+            for (IndexVerifyType verifyType: IndexVerifyType.values()) {
+                if (value.equals(verifyType.getValue())) {
+                    return verifyType;
+                }
+            }
+            throw new IllegalStateException("Invalid value: "+ value + " for " + IndexVerifyType.class);
+        }
+
+        public static IndexVerifyType fromValue(byte[] value) {
+            return fromValue(Bytes.toString(value));
+        }
+    }
+
+    public final static String OUTPUT_TABLE_NAME = "PHOENIX_INDEX_TOOL";
+    public final static byte[] OUTPUT_TABLE_NAME_BYTES = Bytes.toBytes(OUTPUT_TABLE_NAME);
+    public final static byte[] OUTPUT_TABLE_COLUMN_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
+    public final static String DATA_TABLE_NAME = "DTName";
+    public final static byte[] DATA_TABLE_NAME_BYTES = Bytes.toBytes(DATA_TABLE_NAME);
+    public static String INDEX_TABLE_NAME = "ITName";
+    public final static byte[] INDEX_TABLE_NAME_BYTES = Bytes.toBytes(INDEX_TABLE_NAME);
+    public static String DATA_TABLE_ROW_KEY = "DTRowKey";
+    public final static byte[] DATA_TABLE_ROW_KEY_BYTES = Bytes.toBytes(DATA_TABLE_ROW_KEY);
+    public static String INDEX_TABLE_ROW_KEY = "ITRowKey";
+    public final static byte[] INDEX_TABLE_ROW_KEY_BYTES = Bytes.toBytes(INDEX_TABLE_ROW_KEY);
+    public static String DATA_TABLE_TS = "DTTS";
+    public final static byte[] DATA_TABLE_TS_BYTES = Bytes.toBytes(DATA_TABLE_TS);
+    public static String INDEX_TABLE_TS = "ITTS";
+    public final static byte[] INDEX_TABLE_TS_BYTES = Bytes.toBytes(INDEX_TABLE_TS);
+    public static String ERROR_MESSAGE = "Error";
+    public final static byte[] ERROR_MESSAGE_BYTES = Bytes.toBytes(ERROR_MESSAGE);
 
     private static final Logger LOGGER = LoggerFactory.getLogger(IndexTool.class);
 
@@ -119,6 +176,7 @@ public class IndexTool extends Configured implements Tool {
     private String dataTable;
     private String indexTable;
     private boolean isPartialBuild;
+    private IndexVerifyType indexVerifyType = IndexVerifyType.NONE;
     private String qDataTable;
     private String qIndexTable;
     private boolean useSnapshot;
@@ -144,6 +202,17 @@ public class IndexTool extends Configured implements Tool {
     private static final Option DIRECT_API_OPTION = new Option("direct", "direct", false,
             "This parameter is deprecated. Direct mode will be used whether it is set or not. Keeping it for backwards compatibility.");
 
+    private static final Option VERIFY_OPTION = new Option("v", "verify", true,
+            "To verify every data row has a corresponding row. The accepted values are NONE, ONLY, BEFORE," +
+                    " AFTER, and BOTH. NONE is for no inline verification, which is also the default for this option. " +
+                    "ONLY is for verifying without rebuilding index rows. The rest for verifying before, after, and " +
+                    "both before and after rebuilding row. If the verification is done before rebuilding rows and " +
+                    "the correct index rows are not rebuilt. Currently supported values are NONE, ONLY and AFTER ");
+
+    private static final Option ONLY_VERIFY_OPTION = new Option("ov", "only-verify", false,
+            "To verify every data table row has the corresponding index row with the correct content " +
+            "(without building the index table). If the verify option is set then the only-verify option will be ignored");
+
     private static final double DEFAULT_SPLIT_SAMPLING_RATE = 10.0;
 
     private static final Option SPLIT_INDEX_OPTION =
@@ -189,6 +258,8 @@ public class IndexTool extends Configured implements Tool {
         options.addOption(INDEX_TABLE_OPTION);
         options.addOption(PARTIAL_REBUILD_OPTION);
         options.addOption(DIRECT_API_OPTION);
+        options.addOption(VERIFY_OPTION);
+        options.addOption(ONLY_VERIFY_OPTION);
         options.addOption(RUN_FOREGROUND_OPTION);
         options.addOption(OUTPUT_PATH_OPTION);
         options.addOption(SNAPSHOT_OPTION);
@@ -510,6 +581,7 @@ public class IndexTool extends Configured implements Tool {
             PhoenixConfigurationUtil.setIndexToolDataTableName(configuration, qDataTable);
             PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, qIndexTable);
 
+            PhoenixConfigurationUtil.setIndexVerifyType(configuration, indexVerifyType);
             String physicalIndexTable = pIndexTable.getPhysicalName().getString();
 
             PhoenixConfigurationUtil.setPhysicalTableName(configuration, physicalIndexTable);
@@ -569,6 +641,21 @@ public class IndexTool extends Configured implements Tool {
         return job;
     }
 
+    private void createIndexToolOutputTable(Connection connection) throws Exception {
+        ConnectionQueryServices queryServices = connection.unwrap(PhoenixConnection.class).getQueryServices();
+        Admin admin = queryServices.getAdmin();
+        if (admin.tableExists(TableName.valueOf(OUTPUT_TABLE_NAME))) {
+            return;
+        }
+        HTableDescriptor tableDescriptor = new
+                HTableDescriptor(TableName.valueOf(OUTPUT_TABLE_NAME));
+        tableDescriptor.setValue("DISABLE_TABLE_SOR", "true");
+        tableDescriptor.setValue(HColumnDescriptor.TTL, String.valueOf(MetaDataProtocol.DEFAULT_LOG_TTL));
+        HColumnDescriptor columnDescriptor = new HColumnDescriptor(QueryConstants.DEFAULT_COLUMN_FAMILY);
+        tableDescriptor.addFamily(columnDescriptor);
+        admin.createTable(tableDescriptor);
+    }
+
     @Override
     public int run(String[] args) throws Exception {
         Connection connection = null;
@@ -592,6 +679,14 @@ public class IndexTool extends Configured implements Tool {
             dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
             indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
             isPartialBuild = cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt());
+            if (cmdLine.hasOption(VERIFY_OPTION.getOpt())) {
+                String value = cmdLine.getOptionValue(VERIFY_OPTION.getOpt());
+                indexVerifyType = IndexVerifyType.fromValue(value);
+                if (!(indexVerifyType == IndexVerifyType.NONE || indexVerifyType == IndexVerifyType.AFTER ||
+                        indexVerifyType == IndexVerifyType.ONLY)) {
+                    throw new IllegalStateException("Unsupported value for the verify option");
+                }
+            }
             qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable);
             try(Connection tempConn = ConnectionUtil.getInputConnection(configuration)) {
                 pDataTable = PhoenixRuntime.getTableNoCache(tempConn, qDataTable);
@@ -606,7 +701,8 @@ public class IndexTool extends Configured implements Tool {
             pIndexTable = null;
 
             connection = ConnectionUtil.getInputConnection(configuration);
-
+            createIndexToolOutputTable(connection);
+            
             if (indexTable != null) {
                 if (!isValidIndexTable(connection, qDataTable,indexTable, tenantId)) {
                     throw new IllegalArgumentException(String.format(
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 66f0bc8..7aa41f5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -45,6 +45,7 @@ import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor;
 import org.apache.phoenix.mapreduce.PhoenixInputFormat;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat;
 import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable;
+import org.apache.phoenix.mapreduce.index.IndexTool;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
@@ -146,6 +147,8 @@ public final class PhoenixConfigurationUtil {
 
     public static final String DISABLED_INDEXES = "phoenix.mr.index.disabledIndexes";
 
+    public static final String INDEX_VERIFY_TYPE = "phoenix.mr.index.IndexVerifyType";
+
     // Generate splits based on scans from stats, or just from region splits
     public static final String MAPREDUCE_SPLIT_BY_STATS = "phoenix.mapreduce.split.by.stats";
 
@@ -537,6 +540,11 @@ public final class PhoenixConfigurationUtil {
         configuration.set(DISABLED_INDEXES, indexName);
     }
 
+    public static void setIndexVerifyType(Configuration configuration, IndexTool.IndexVerifyType verifyType) {
+        Preconditions.checkNotNull(configuration);
+        configuration.set(INDEX_VERIFY_TYPE, verifyType.getValue());
+    }
+
     public static String getScrutinyDataTableName(Configuration configuration) {
         Preconditions.checkNotNull(configuration);
         return configuration.get(SCRUTINY_DATA_TABLE_NAME);
@@ -657,6 +665,12 @@ public final class PhoenixConfigurationUtil {
         return configuration.get(DISABLED_INDEXES);
     }
 
+    public static IndexTool.IndexVerifyType getIndexVerifyType(Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        String value = configuration.get(INDEX_VERIFY_TYPE, IndexTool.IndexVerifyType.NONE.getValue());
+        return IndexTool.IndexVerifyType.fromValue(value);
+    }
+
     public static boolean getSplitByStats(final Configuration configuration) {
         Preconditions.checkNotNull(configuration);
         boolean split = configuration.getBoolean(MAPREDUCE_SPLIT_BY_STATS, DEFAULT_SPLIT_BY_STATS);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 9b982af..fbf7b6a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -356,7 +356,7 @@ public class QueryServicesOptions {
 
     public static final long DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS = 7*24*60*60*1000; /* 7 days */
     public static final boolean DEFAULT_INDEX_REGION_OBSERVER_ENABLED = true;
-    public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 16*1024;
+    public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 32*1024;
 
     public static final boolean DEFAULT_ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK = false;
 


[phoenix] 03/03: PHOENIX-5666 IndexRegionObserver incorrectly updates PostIndexUpdateFailure metric

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.x-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit 7a525c1fb0763852bd6db06576ced00f9b726858
Author: Kadir <ko...@salesforce.com>
AuthorDate: Wed Jan 8 01:51:41 2020 -0800

    PHOENIX-5666 IndexRegionObserver incorrectly updates PostIndexUpdateFailure metric
---
 .../main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 6e12950..08a120e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -763,7 +763,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
       ListMultimap<HTableInterfaceReference, Mutation> indexUpdates = post ? context.postIndexUpdates : context.preIndexUpdates;
       //short circuit, if we don't need to do any work
 
-      if (context == null || indexUpdates.isEmpty()) {
+      if (context == null || indexUpdates == null || indexUpdates.isEmpty()) {
           return;
       }
 


[phoenix] 02/03: PHOENIX-5658 IndexTool to verify index rows inline (addendum - removed unused the only-verify option)

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.x-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit 662d72623a95f0320e050cd9b7510628dfb40fe0
Author: Kadir <ko...@salesforce.com>
AuthorDate: Sun Jan 12 19:18:33 2020 -0800

    PHOENIX-5658 IndexTool to verify index rows inline (addendum - removed unused the only-verify option)
---
 .../src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java  | 5 -----
 1 file changed, 5 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index dcabdfd..6b77ed8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -209,10 +209,6 @@ public class IndexTool extends Configured implements Tool {
                     "both before and after rebuilding row. If the verification is done before rebuilding rows and " +
                     "the correct index rows are not rebuilt. Currently supported values are NONE, ONLY and AFTER ");
 
-    private static final Option ONLY_VERIFY_OPTION = new Option("ov", "only-verify", false,
-            "To verify every data table row has the corresponding index row with the correct content " +
-            "(without building the index table). If the verify option is set then the only-verify option will be ignored");
-
     private static final double DEFAULT_SPLIT_SAMPLING_RATE = 10.0;
 
     private static final Option SPLIT_INDEX_OPTION =
@@ -259,7 +255,6 @@ public class IndexTool extends Configured implements Tool {
         options.addOption(PARTIAL_REBUILD_OPTION);
         options.addOption(DIRECT_API_OPTION);
         options.addOption(VERIFY_OPTION);
-        options.addOption(ONLY_VERIFY_OPTION);
         options.addOption(RUN_FOREGROUND_OPTION);
         options.addOption(OUTPUT_PATH_OPTION);
         options.addOption(SNAPSHOT_OPTION);