You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2019/10/24 06:15:02 UTC

[phoenix] branch 4.x-HBase-1.5 updated (3ba5464 -> cee2c7a)

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a change to branch 4.x-HBase-1.5
in repository https://gitbox.apache.org/repos/asf/phoenix.git.


    from 3ba5464  PHOENIX-5535 Index rebuilds via UngroupedAggregateRegionObserver should replay delete markers
     new 883e3bd  Revert "PHOENIX-5535 Index rebuilds via UngroupedAggregateRegionObserver should replay delete markers"
     new cee2c7a  PHOENIX-5478 IndexTool mapper task should not timeout

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/phoenix/end2end/IndexToolIT.java    |  72 +-----
 .../apache/phoenix/compile/PostDDLCompiler.java    | 253 +++++++++------------
 .../phoenix/compile/ServerBuildIndexCompiler.java  | 109 ++++-----
 .../coprocessor/BaseScannerRegionObserver.java     |   2 +
 .../UngroupedAggregateRegionObserver.java          | 220 ++++++++++--------
 .../apache/phoenix/index/GlobalIndexChecker.java   |  12 +-
 .../PhoenixServerBuildIndexInputFormat.java        |  12 +-
 .../apache/phoenix/mapreduce/index/IndexTool.java  |  12 -
 .../org/apache/phoenix/query/QueryServices.java    |   2 +
 .../apache/phoenix/query/QueryServicesOptions.java |   3 +-
 .../org/apache/phoenix/schema/MetaDataClient.java  |   9 +-
 11 files changed, 312 insertions(+), 394 deletions(-)


[phoenix] 02/02: PHOENIX-5478 IndexTool mapper task should not timeout

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.x-HBase-1.5
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit cee2c7a66f494c3e761ccdbee6a9c1353925a961
Author: Kadir <ko...@salesforce.com>
AuthorDate: Wed Oct 23 22:38:13 2019 -0700

    PHOENIX-5478 IndexTool mapper task should not timeout
---
 .../org/apache/phoenix/end2end/IndexToolIT.java    |   2 +-
 .../coprocessor/BaseScannerRegionObserver.java     |   2 +
 .../UngroupedAggregateRegionObserver.java          | 220 +++++++++++----------
 .../apache/phoenix/index/GlobalIndexChecker.java   |   4 -
 .../PhoenixServerBuildIndexInputFormat.java        |   2 +
 .../apache/phoenix/mapreduce/index/IndexTool.java  |  12 --
 .../org/apache/phoenix/query/QueryServices.java    |   2 +
 .../apache/phoenix/query/QueryServicesOptions.java |   3 +-
 8 files changed, 130 insertions(+), 117 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 2f12ae9..8af5295 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -80,7 +80,6 @@ import com.google.common.collect.Maps;
 
 @RunWith(Parameterized.class)
 public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
-
     private final boolean localIndex;
     private final boolean mutable;
     private final boolean transactional;
@@ -118,6 +117,7 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
         serverProps.put(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB, Long.toString(5));
         serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
             QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+        serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(8));
         Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
         clientProps.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true));
         clientProps.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5));
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index b73615f..cb4d0af 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -75,6 +75,8 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     public static final String GROUP_BY_LIMIT = "_GroupByLimit";
     public static final String LOCAL_INDEX = "_LocalIndex";
     public static final String LOCAL_INDEX_BUILD = "_LocalIndexBuild";
+    // The number of index rows to be rebuild in one RPC call
+    public static final String INDEX_REBUILD_PAGING = "_IndexRebuildPaging";
     /* 
     * Attribute to denote that the index maintainer has been serialized using its proto-buf presentation.
     * Needed for backward compatibility purposes. TODO: get rid of this in next major release.
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 3a03f94..0a16a68 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -21,6 +21,7 @@ import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
+import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS;
 import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
 import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
@@ -1056,116 +1057,137 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             throw new RuntimeException(e);
         }
     }
-    
-    private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan,
-            Configuration config) throws IOException {
-        byte[] indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
-        boolean useProto = true;
-        // for backward compatibility fall back to look up by the old attribute
-        if (indexMetaData == null) {
-            useProto = false;
-            indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+
+    private class IndexRebuildRegionScanner extends BaseRegionScanner {
+        private long pageSizeInRows = Long.MAX_VALUE;
+        private boolean hasMore;
+        private final int maxBatchSize;
+        private MutationList mutations;
+        private final long maxBatchSizeBytes;
+        private final long blockingMemstoreSize;
+        private final byte[] clientVersionBytes;
+        private List<Cell> results = new ArrayList<Cell>();
+        private byte[] indexMetaData;
+        private boolean useProto = true;
+        private Scan scan;
+        private RegionScanner innerScanner;
+        final Region region;
+
+        IndexRebuildRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan,
+                                   final Configuration config) {
+            super(innerScanner);
+            if (scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING) != null) {
+                pageSizeInRows = config.getLong(INDEX_REBUILD_PAGE_SIZE_IN_ROWS,
+                        QueryServicesOptions.DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS);
+            }
+
+            maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+            mutations = new MutationList(maxBatchSize);
+            maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
+                    QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
+            blockingMemstoreSize = getBlockingMemstoreSize(region, config);
+            clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
+            indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
+            if (indexMetaData == null) {
+                useProto = false;
+                indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+            }
+            this.scan = scan;
+            this.innerScanner = innerScanner;
+            this.region = region;
         }
-        byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
-        boolean hasMore;
-        int rowCount = 0;
-        try {
-            int maxBatchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
-            long maxBatchSizeBytes = config.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
-                QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
-            final long blockingMemstoreSize = getBlockingMemstoreSize(region, config);
-            MutationList mutations = new MutationList(maxBatchSize);
-            region.startRegionOperation();
-            byte[] uuidValue = ServerCacheClient.generateId();
-            synchronized (innerScanner) {
-                do {
-                    List<Cell> results = new ArrayList<Cell>();
-                    hasMore = innerScanner.nextRaw(results);
-                    if (!results.isEmpty()) {
-                        Put put = null;
-                        Delete del = null;
-                        for (Cell cell : results) {
-
-                            if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
-                                if (put == null) {
-                                    put = new Put(CellUtil.cloneRow(cell));
-                                    put.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
-                                    put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                                    put.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
-                                        BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
-                                    put.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
-                                    mutations.add(put);
-                                    // Since we're replaying existing mutations, it makes no sense to write them to the wal
-                                    put.setDurability(Durability.SKIP_WAL);
-                                }
-                                put.add(cell);
-                            } else {
-                                if (del == null) {
-                                    del = new Delete(CellUtil.cloneRow(cell));
-                                    del.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
-                                    del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                                    del.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
-                                        BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
-                                    del.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
-                                    mutations.add(del);
-                                    // Since we're replaying existing mutations, it makes no sense to write them to the wal
-                                    del.setDurability(Durability.SKIP_WAL);
+        @Override
+        public HRegionInfo getRegionInfo() {
+            return region.getRegionInfo();
+        }
+
+        @Override
+        public boolean isFilterDone() { return hasMore; }
+
+        @Override
+        public void close() throws IOException { innerScanner.close(); }
+
+        private void setMutationAttributes(Mutation m, byte[] uuidValue) {
+            m.setAttribute(useProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMetaData);
+            m.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+            m.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
+                    BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
+            m.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersionBytes);
+            // Since we're replaying existing mutations, it makes no sense to write them to the wal
+            m.setDurability(Durability.SKIP_WAL);
+        }
+
+        @Override
+        public boolean next(List<Cell> results) throws IOException {
+            int rowCount = 0;
+            try {
+                byte[] uuidValue = ServerCacheClient.generateId();
+                synchronized (innerScanner) {
+                    do {
+                        List<Cell> row = new ArrayList<Cell>();
+                        hasMore = innerScanner.nextRaw(row);
+                        if (!row.isEmpty()) {
+                            Put put = null;
+                            Delete del = null;
+                            for (Cell cell : row) {
+                                if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+                                    if (put == null) {
+                                        put = new Put(CellUtil.cloneRow(cell));
+                                        setMutationAttributes(put, uuidValue);
+                                        mutations.add(put);
+                                    }
+                                    put.add(cell);
+                                } else {
+                                    if (del == null) {
+                                        del = new Delete(CellUtil.cloneRow(cell));
+                                        setMutationAttributes(del, uuidValue);
+                                        mutations.add(del);
+                                    }
+                                    del.addDeleteMarker(cell);
                                 }
-                                del.addDeleteMarker(cell);
                             }
+                            if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
+                                checkForRegionClosingOrSplitting();
+                                commitBatchWithRetries(region, mutations, blockingMemstoreSize);
+                                uuidValue = ServerCacheClient.generateId();
+                                mutations.clear();
+                            }
+                            rowCount++;
                         }
-                        if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
-                            checkForRegionClosingOrSplitting();
-                            commitBatchWithRetries(region, mutations, blockingMemstoreSize);
-                            uuidValue = ServerCacheClient.generateId();
-                            mutations.clear();
-                        }
-                        rowCount++;
+
+                    } while (hasMore && rowCount < pageSizeInRows);
+                    if (!mutations.isEmpty()) {
+                        checkForRegionClosingOrSplitting();
+                        commitBatchWithRetries(region, mutations, blockingMemstoreSize);
                     }
-                    
-                } while (hasMore);
-                if (!mutations.isEmpty()) {
-                    checkForRegionClosingOrSplitting();
-                    commitBatchWithRetries(region, mutations, blockingMemstoreSize);
+                }
+            } catch (IOException e) {
+                hasMore = false;
+                LOGGER.error("IOException during rebuilding: " + Throwables.getStackTraceAsString(e));
+                throw e;
+            } finally {
+                if (!hasMore) {
+                    region.closeRegionOperation();
                 }
             }
-        } catch (IOException e) {
-            LOGGER.error("IOException during rebuilding: " + Throwables.getStackTraceAsString(e));
-            throw e;
-        } finally {
-            region.closeRegionOperation();
+            byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
+            final KeyValue aggKeyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
+                    SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
+            results.add(aggKeyValue);
+            return hasMore;
         }
-        byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
-        final KeyValue aggKeyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
-                SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
-
-        RegionScanner scanner = new BaseRegionScanner(innerScanner) {
-            @Override
-            public HRegionInfo getRegionInfo() {
-                return region.getRegionInfo();
-            }
-
-            @Override
-            public boolean isFilterDone() {
-                return true;
-            }
 
-            @Override
-            public void close() throws IOException {
-                innerScanner.close();
-            }
+        @Override
+        public long getMaxResultSize() {
+            return scan.getMaxResultSize();
+        }
+    }
 
-            @Override
-            public boolean next(List<Cell> results) throws IOException {
-                results.add(aggKeyValue);
-                return false;
-            }
+    private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan,
+                                         final Configuration config) throws IOException {
 
-            @Override
-            public long getMaxResultSize() {
-                return scan.getMaxResultSize();
-            }
-        };
+        region.startRegionOperation();
+        RegionScanner scanner = new IndexRebuildRegionScanner(innerScanner, region, scan, config);
         return scanner;
     }
     
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index 1b3b9c3..80e036e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -253,10 +253,6 @@ public class GlobalIndexChecker extends BaseRegionObserver {
                             return result.getRow();
                         }
                     };
-                    for (Cell cell : result.rawCells()) {
-                        String cellString = cell.toString();
-                        LOG.debug("Rebuilt row :" + cellString + " value : " + Bytes.toStringBinary(CellUtil.cloneValue(cell)));
-                    }
                     byte[] builtIndexRowKey = indexMaintainer.buildRowKey(getter, new ImmutableBytesWritable(dataRowKey), null, null, maxTimestamp);
                     if (Bytes.compareTo(builtIndexRowKey, 0, builtIndexRowKey.length,
                             indexRowKey, 0, indexRowKey.length) != 0) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
index 1abcef4..7e61b2d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
@@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Preconditions;
 import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolDataTableName;
 import static org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.getIndexToolIndexTableName;
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 
 /**
  * {@link InputFormat} implementation from Phoenix for building index
@@ -90,6 +91,7 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph
 
             try {
                 scan.setTimeRange(0, scn);
+                scan.setAttribute(BaseScannerRegionObserver.INDEX_REBUILD_PAGING, TRUE_BYTES);
             } catch (IOException e) {
                 throw new SQLException(e);
             }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index dbaeead..22013e8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -500,17 +500,6 @@ public class IndexTool extends Configured implements Tool {
         private Job configureJobForServerBuildIndex()
                 throws Exception {
 
-            long indexRebuildQueryTimeoutMs =
-                    configuration.getLong(QueryServices.INDEX_REBUILD_QUERY_TIMEOUT_ATTRIB,
-                            QueryServicesOptions.DEFAULT_INDEX_REBUILD_QUERY_TIMEOUT);
-            // Set various phoenix and hbase level timeouts and rpc retries
-            configuration.set(QueryServices.THREAD_TIMEOUT_MS_ATTRIB,
-                    Long.toString(indexRebuildQueryTimeoutMs));
-            configuration.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
-                    Long.toString(indexRebuildQueryTimeoutMs));
-            configuration.set(HConstants.HBASE_RPC_TIMEOUT_KEY,
-                    Long.toString(indexRebuildQueryTimeoutMs));
-
             PhoenixConfigurationUtil.setIndexToolDataTableName(configuration, qDataTable);
             PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, qIndexTable);
 
@@ -526,7 +515,6 @@ public class IndexTool extends Configured implements Tool {
                 fs = outputPath.getFileSystem(configuration);
                 fs.delete(outputPath, true);
             }
-            configuration.set("mapreduce.task.timeout", Long.toString(indexRebuildQueryTimeoutMs));
             final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable);
             final Job job = Job.getInstance(configuration, jobName);
             job.setJarByClass(IndexTool.class);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index ffeec51..93e218e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -352,6 +352,8 @@ public interface QueryServices extends SQLCloseable {
     public static final String INDEX_REGION_OBSERVER_ENABLED_ATTRIB = "phoenix.index.region.observer.enabled";
     // Enable support for long view index(default is false)
     public static final String LONG_VIEW_INDEX_ENABLED_ATTRIB = "phoenix.index.longViewIndex.enabled";
+    // The number of index rows to be rebuild in one RPC call
+    public static final String INDEX_REBUILD_PAGE_SIZE_IN_ROWS = "phoenix.index.rebuild_page_size_in_rows";
 
 
     // Before 4.15 when we created a view we included the parent table column metadata in the view
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index eb4cf7c..fe51f89 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -209,7 +209,7 @@ public class QueryServicesOptions {
     public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_BACKWARD_TIME = 1; // 1 ms
     public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME = 60000 * 3; // 3 mins
     // 30 min rpc timeout * 5 tries, with 2100ms total pause time between retries
-    public static final long DEFAULT_INDEX_REBUILD_QUERY_TIMEOUT = 60000 * 60 * 24; // 24 hrs
+    public static final long DEFAULT_INDEX_REBUILD_QUERY_TIMEOUT = (5 * 30000 * 60) + 2100;
     public static final long DEFAULT_INDEX_REBUILD_RPC_TIMEOUT = 30000 * 60; // 30 mins
     public static final long DEFAULT_INDEX_REBUILD_CLIENT_SCANNER_TIMEOUT = 30000 * 60; // 30 mins
     public static final int DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER = 5; // 5 total tries at rpc level
@@ -358,6 +358,7 @@ public class QueryServicesOptions {
 
     public static final long DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS = 7*24*60*60*1000; /* 7 days */
     public static final boolean DEFAULT_INDEX_REGION_OBSERVER_ENABLED = true;
+    public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 1024*1024;
 
     public static final boolean DEFAULT_ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK = false;
 


[phoenix] 01/02: Revert "PHOENIX-5535 Index rebuilds via UngroupedAggregateRegionObserver should replay delete markers"

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.x-HBase-1.5
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit 883e3bd4ffb7b040fc2547d2165bec50477c2acd
Author: Kadir <ko...@salesforce.com>
AuthorDate: Wed Oct 23 08:55:28 2019 -0700

    Revert "PHOENIX-5535 Index rebuilds via UngroupedAggregateRegionObserver should replay delete markers"
    
    This reverts commit 3ba54648c0c2afee2028f3ed05c3d34ec030cb5d.
---
 .../org/apache/phoenix/end2end/IndexToolIT.java    |  72 +-----
 .../apache/phoenix/compile/PostDDLCompiler.java    | 253 +++++++++------------
 .../phoenix/compile/ServerBuildIndexCompiler.java  | 109 ++++-----
 .../apache/phoenix/index/GlobalIndexChecker.java   |   8 +-
 .../PhoenixServerBuildIndexInputFormat.java        |  10 +-
 .../org/apache/phoenix/schema/MetaDataClient.java  |   9 +-
 6 files changed, 183 insertions(+), 278 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 87f6b20..2f12ae9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -29,7 +29,6 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -40,7 +39,6 @@ import java.util.UUID;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
@@ -51,7 +49,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.phoenix.end2end.index.PartialIndexRebuilderIT;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.index.IndexTool;
 import org.apache.phoenix.query.ConnectionQueryServices;
@@ -80,12 +77,10 @@ import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @RunWith(Parameterized.class)
 public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
-    private static final Logger LOGGER = LoggerFactory.getLogger(PartialIndexRebuilderIT.class);
+
     private final boolean localIndex;
     private final boolean mutable;
     private final boolean transactional;
@@ -259,71 +254,6 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
         }
     }
 
-    private void setEveryNthRowWithNull(int nrows, int nthRowNull, PreparedStatement stmt) throws Exception {
-        for (int i = 0; i < nrows; i++) {
-            stmt.setInt(1, i);
-            stmt.setInt(2, i * 10);
-            if (i % nthRowNull != 0) {
-                stmt.setInt(3, 9000 + i * nthRowNull);
-            } else {
-                stmt.setNull(3, Types.INTEGER);
-            }
-            stmt.execute();
-        }
-    }
-
-    @Test
-    public void testWithSetNull() throws Exception {
-        // This test is for building non-transactional mutable global indexes with direct api
-        if (localIndex || transactional || !mutable) {
-            return;
-        }
-        // This tests the cases where a column having a null value is overwritten with a not null value and vice versa;
-        // and after that the index table is still rebuilt correctly
-        final int NROWS = 2 * 3 * 5 * 7;
-        String schemaName = generateUniqueName();
-        String dataTableName = generateUniqueName();
-        String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
-        String indexTableName = generateUniqueName();
-        String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName);
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            String stmString1 =
-                    "CREATE TABLE " + dataTableFullName
-                            + " (ID INTEGER NOT NULL PRIMARY KEY, VAL INTEGER, ZIP INTEGER) "
-                            + tableDDLOptions;
-            conn.createStatement().execute(stmString1);
-            String upsertStmt = "UPSERT INTO " + dataTableFullName + " VALUES(?,?,?)";
-            PreparedStatement stmt = conn.prepareStatement(upsertStmt);
-            setEveryNthRowWithNull(NROWS, 2, stmt);
-            conn.commit();
-            setEveryNthRowWithNull(NROWS, 3, stmt);
-            conn.commit();
-            String stmtString2 =
-                    String.format(
-                            "CREATE %s INDEX %s ON %s (VAL) INCLUDE (ZIP) ASYNC ",
-                            (localIndex ? "LOCAL" : ""), indexTableName, dataTableFullName);
-            conn.createStatement().execute(stmtString2);
-            // Run the index MR job and verify that the index table is built correctly
-            IndexTool indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, new String[0]);
-            assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
-            long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
-            assertEquals(NROWS, actualRowCount);
-
-            // Repeat the test with compaction
-            setEveryNthRowWithNull(NROWS, 5, stmt);
-            conn.commit();
-            setEveryNthRowWithNull(NROWS, 7, stmt);
-            conn.commit();
-            TestUtil.doMajorCompaction(conn, dataTableFullName);
-            // Run the index MR job and verify that the index table is built correctly
-            indexTool = runIndexTool(directApi, useSnapshot, schemaName, dataTableName, indexTableName, null, 0, new String[0]);
-            assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
-            actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName);
-            assertEquals(NROWS, actualRowCount);
-        }
-    }
-
     @Test
     public void testBuildSecondaryIndexAndScrutinize() throws Exception {
         // This test is for building non-transactional global indexes with direct api
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
index 04a3188..a74c5f1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
@@ -72,7 +72,6 @@ import com.google.common.collect.Lists;
 public class PostDDLCompiler {
     private final PhoenixConnection connection;
     private final Scan scan;
-    private PostDDLMutationPlan mutationPlan = null;
 
     public PostDDLCompiler(PhoenixConnection connection) {
         this(connection, new Scan());
@@ -92,12 +91,7 @@ public class PostDDLCompiler {
             new MultipleTableRefColumnResolver(tableRefs),
                 scan,
                 new SequenceManager(statement));
-        this.mutationPlan = new PostDDLMutationPlan(context, tableRefs, timestamp, emptyCF, deleteList, projectCFs);
-        return this.mutationPlan;
-    }
-
-    public QueryPlan getQueryPlan(TableRef tableRef) throws SQLException {
-        return mutationPlan.getQueryPlan(tableRef);
+        return new PostDDLMutationPlan(context, tableRefs, timestamp, emptyCF, deleteList, projectCFs);
     }
 
     private static class MultipleTableRefColumnResolver implements ColumnResolver {
@@ -171,120 +165,6 @@ public class PostDDLCompiler {
             this.projectCFs = projectCFs;
         }
 
-        public QueryPlan getQueryPlan(final TableRef tableRef) throws SQLException {
-            if (tableRefs.isEmpty()) {
-                return null;
-            }
-            QueryPlan plan = null;
-            boolean wasAutoCommit = connection.getAutoCommit();
-            try {
-                connection.setAutoCommit(true);
-                SQLException sqlE = null;
-                /*
-                 * Handles:
-                 * 1) deletion of all rows for a DROP TABLE and subsequently deletion of all rows for a DROP INDEX;
-                 * 2) deletion of all column values for a ALTER TABLE DROP COLUMN
-                 * 3) updating the necessary rows to have an empty KV
-                 * 4) updating table stats
-                 */
-
-                Scan scan = ScanUtil.newScan(context.getScan());
-                SelectStatement select = SelectStatement.COUNT_ONE;
-                // We need to use this tableRef
-                ColumnResolver resolver = new SingleTableRefColumnResolver(tableRef);
-                PhoenixStatement statement = new PhoenixStatement(connection);
-                StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement));
-                long ts = timestamp;
-                // FIXME: DDL operations aren't transactional, so we're basing the timestamp on a server timestamp.
-                // Not sure what the fix should be. We don't need conflict detection nor filtering of invalid transactions
-                // in this case, so maybe this is ok.
-                if (ts!= HConstants.LATEST_TIMESTAMP && tableRef.getTable().isTransactional()) {
-                    ts = TransactionUtil.convertToNanoseconds(ts);
-                }
-                ScanUtil.setTimeRange(scan, scan.getTimeRange().getMin(), ts);
-                if (emptyCF != null) {
-                    scan.setAttribute(BaseScannerRegionObserver.EMPTY_CF, emptyCF);
-                    scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER, EncodedColumnsUtil.getEmptyKeyValueInfo(tableRef.getTable()).getFirst());
-                }
-                ServerCache cache = null;
-                try {
-                    if (deleteList != null) {
-                        if (deleteList.isEmpty()) {
-                            scan.setAttribute(BaseScannerRegionObserver.DELETE_AGG, QueryConstants.TRUE);
-                            // In the case of a row deletion, add index metadata so mutable secondary indexing works
-                            /* TODO: we currently manually run a scan to delete the index data here
-                            ImmutableBytesWritable ptr = context.getTempPtr();
-                            tableRef.getTable().getIndexMaintainers(ptr);
-                            if (ptr.getLength() > 0) {
-                                IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
-                                cache = client.addIndexMetadataCache(context.getScanRanges(), ptr);
-                                byte[] uuidValue = cache.getId();
-                                scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                            }
-                            */
-                        } else {
-                            // In the case of the empty key value column family changing, do not send the index
-                            // metadata, as we're currently managing this from the client. It's possible for the
-                            // data empty column family to stay the same, while the index empty column family
-                            // changes.
-                            PColumn column = deleteList.get(0);
-                            byte[] cq = column.getColumnQualifierBytes();
-                            if (emptyCF == null) {
-                                scan.addColumn(column.getFamilyName().getBytes(), cq);
-                            }
-                            scan.setAttribute(BaseScannerRegionObserver.DELETE_CF, column.getFamilyName().getBytes());
-                            scan.setAttribute(BaseScannerRegionObserver.DELETE_CQ, cq);
-                        }
-                    }
-                    List<byte[]> columnFamilies = Lists.newArrayListWithExpectedSize(tableRef.getTable().getColumnFamilies().size());
-                    if (projectCFs == null) {
-                        for (PColumnFamily family : tableRef.getTable().getColumnFamilies()) {
-                            columnFamilies.add(family.getName().getBytes());
-                        }
-                    } else {
-                        for (byte[] projectCF : projectCFs) {
-                            columnFamilies.add(projectCF);
-                        }
-                    }
-                    // Need to project all column families into the scan, since we haven't yet created our empty key value
-                    RowProjector projector = ProjectionCompiler.compile(context, SelectStatement.COUNT_ONE, GroupBy.EMPTY_GROUP_BY);
-                    context.getAggregationManager().compile(context, GroupBy.EMPTY_GROUP_BY);
-                    // Explicitly project these column families and don't project the empty key value,
-                    // since at this point we haven't added the empty key value everywhere.
-                    if (columnFamilies != null) {
-                        scan.getFamilyMap().clear();
-                        for (byte[] family : columnFamilies) {
-                            scan.addFamily(family);
-                        }
-                        projector = new RowProjector(projector,false);
-                    }
-                    // Ignore exceptions due to not being able to resolve any view columns,
-                    // as this just means the view is invalid. Continue on and try to perform
-                    // any other Post DDL operations.
-                    try {
-                        // Since dropping a VIEW does not affect the underlying data, we do
-                        // not need to pass through the view statement here.
-                        WhereCompiler.compile(context, select); // Push where clause into scan
-                    } catch (ColumnFamilyNotFoundException e) {
-                        return null;
-                    } catch (ColumnNotFoundException e) {
-                        return null;
-                    } catch (AmbiguousColumnException e) {
-                        return null;
-                    }
-                    plan = new AggregatePlan(context, select, tableRef, projector, null, null,
-                            OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null, null);
-                } finally {
-                    if (cache != null) { // Remove server cache if there is one
-                        cache.close();
-                    }
-                }
-            } finally {
-                if (!wasAutoCommit) connection.setAutoCommit(wasAutoCommit);
-            }
-            return plan;
-        }
-
         @Override
         public MutationState execute() throws SQLException {
             if (tableRefs.isEmpty()) {
@@ -303,36 +183,125 @@ public class PostDDLCompiler {
                  */
                 long totalMutationCount = 0;
                 for (final TableRef tableRef : tableRefs) {
-                    QueryPlan plan = getQueryPlan(tableRef);
-                    if (plan == null)
-                        continue;
+                    Scan scan = ScanUtil.newScan(context.getScan());
+                    SelectStatement select = SelectStatement.COUNT_ONE;
+                    // We need to use this tableRef
+                    ColumnResolver resolver = new SingleTableRefColumnResolver(tableRef);
+                    PhoenixStatement statement = new PhoenixStatement(connection);
+                    StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement));
+                    long ts = timestamp;
+                    // FIXME: DDL operations aren't transactional, so we're basing the timestamp on a server timestamp.
+                    // Not sure what the fix should be. We don't need conflict detection nor filtering of invalid transactions
+                    // in this case, so maybe this is ok.
+                    if (ts!= HConstants.LATEST_TIMESTAMP && tableRef.getTable().isTransactional()) {
+                        ts = TransactionUtil.convertToNanoseconds(ts);
+                    }
+                    ScanUtil.setTimeRange(scan, scan.getTimeRange().getMin(), ts);
+                    if (emptyCF != null) {
+                        scan.setAttribute(BaseScannerRegionObserver.EMPTY_CF, emptyCF);
+                        scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER, EncodedColumnsUtil.getEmptyKeyValueInfo(tableRef.getTable()).getFirst());
+                    }
+                    ServerCache cache = null;
                     try {
-                        ResultIterator iterator = plan.iterator();
+                        if (deleteList != null) {
+                            if (deleteList.isEmpty()) {
+                                scan.setAttribute(BaseScannerRegionObserver.DELETE_AGG, QueryConstants.TRUE);
+                                // In the case of a row deletion, add index metadata so mutable secondary indexing works
+                                /* TODO: we currently manually run a scan to delete the index data here
+                                ImmutableBytesWritable ptr = context.getTempPtr();
+                                tableRef.getTable().getIndexMaintainers(ptr);
+                                if (ptr.getLength() > 0) {
+                                    IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
+                                    cache = client.addIndexMetadataCache(context.getScanRanges(), ptr);
+                                    byte[] uuidValue = cache.getId();
+                                    scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+                                }
+                                */
+                            } else {
+                                // In the case of the empty key value column family changing, do not send the index
+                                // metadata, as we're currently managing this from the client. It's possible for the
+                                // data empty column family to stay the same, while the index empty column family
+                                // changes.
+                                PColumn column = deleteList.get(0);
+                                byte[] cq = column.getColumnQualifierBytes();
+                                if (emptyCF == null) {
+                                    scan.addColumn(column.getFamilyName().getBytes(), cq);
+                                }
+                                scan.setAttribute(BaseScannerRegionObserver.DELETE_CF, column.getFamilyName().getBytes());
+                                scan.setAttribute(BaseScannerRegionObserver.DELETE_CQ, cq);
+                            }
+                        }
+                        List<byte[]> columnFamilies = Lists.newArrayListWithExpectedSize(tableRef.getTable().getColumnFamilies().size());
+                        if (projectCFs == null) {
+                            for (PColumnFamily family : tableRef.getTable().getColumnFamilies()) {
+                                columnFamilies.add(family.getName().getBytes());
+                            }
+                        } else {
+                            for (byte[] projectCF : projectCFs) {
+                                columnFamilies.add(projectCF);
+                            }
+                        }
+                        // Need to project all column families into the scan, since we haven't yet created our empty key value
+                        RowProjector projector = ProjectionCompiler.compile(context, SelectStatement.COUNT_ONE, GroupBy.EMPTY_GROUP_BY);
+                        context.getAggregationManager().compile(context, GroupBy.EMPTY_GROUP_BY);
+                        // Explicitly project these column families and don't project the empty key value,
+                        // since at this point we haven't added the empty key value everywhere.
+                        if (columnFamilies != null) {
+                            scan.getFamilyMap().clear();
+                            for (byte[] family : columnFamilies) {
+                                scan.addFamily(family);
+                            }
+                            projector = new RowProjector(projector,false);
+                        }
+                        // Ignore exceptions due to not being able to resolve any view columns,
+                        // as this just means the view is invalid. Continue on and try to perform
+                        // any other Post DDL operations.
+                        try {
+                            // Since dropping a VIEW does not affect the underlying data, we do
+                            // not need to pass through the view statement here.
+                            WhereCompiler.compile(context, select); // Push where clause into scan
+                        } catch (ColumnFamilyNotFoundException e) {
+                            continue;
+                        } catch (ColumnNotFoundException e) {
+                            continue;
+                        } catch (AmbiguousColumnException e) {
+                            continue;
+                        }
+                        QueryPlan plan = new AggregatePlan(context, select, tableRef, projector, null, null,
+                                OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null, null);
                         try {
-                            Tuple row = iterator.next();
-                            ImmutableBytesWritable ptr = context.getTempPtr();
-                            totalMutationCount += (Long)plan.getProjector().getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr);
-                        } catch (SQLException e) {
-                            sqlE = e;
-                        } finally {
+                            ResultIterator iterator = plan.iterator();
                             try {
-                                iterator.close();
+                                Tuple row = iterator.next();
+                                ImmutableBytesWritable ptr = context.getTempPtr();
+                                totalMutationCount += (Long)projector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr);
                             } catch (SQLException e) {
-                                if (sqlE == null) {
-                                    sqlE = e;
-                                } else {
-                                    sqlE.setNextException(e);
-                                }
+                                sqlE = e;
                             } finally {
-                                if (sqlE != null) {
-                                    throw sqlE;
+                                try {
+                                    iterator.close();
+                                } catch (SQLException e) {
+                                    if (sqlE == null) {
+                                        sqlE = e;
+                                    } else {
+                                        sqlE.setNextException(e);
+                                    }
+                                } finally {
+                                    if (sqlE != null) {
+                                        throw sqlE;
+                                    }
                                 }
                             }
+                        } catch (TableNotFoundException e) {
+                            // Ignore and continue, as HBase throws when table hasn't been written to
+                            // FIXME: Remove if this is fixed in 0.96
+                        }
+                    } finally {
+                        if (cache != null) { // Remove server cache if there is one
+                            cache.close();
                         }
-                    } catch (TableNotFoundException e) {
-                        // Ignore and continue, as HBase throws when table hasn't been written to
-                        // FIXME: Remove if this is fixed in 0.96
                     }
+
                 }
                 final long count = totalMutationCount;
                 return new MutationState(1, 1000, connection) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
index 40cea2c..7d1c1b4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
@@ -19,16 +19,15 @@ package org.apache.phoenix.compile;
 
 import java.sql.SQLException;
 import java.util.Collections;
+import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
-import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -37,22 +36,24 @@ import org.apache.phoenix.schema.*;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.ByteUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.StringUtil;
+
+import com.google.common.collect.Lists;
 
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 
 
 /**
- * Class that compiles queryPlan to generate initial data values after a DDL command for
+ * Class that compiles plan to generate initial data values after a DDL command for
  * index table.
  */
 public class ServerBuildIndexCompiler {
     private final PhoenixConnection connection;
-    private final String indexTableFullName;
-    private final String dataTableFullName;
+    private final String tableName;
     private PTable dataTable;
-    private QueryPlan queryPlan;
+    private QueryPlan plan;
 
     private class RowCountMutationPlan extends BaseMutationPlan {
         private RowCountMutationPlan(StatementContext context, PhoenixStatement.Operation operation) {
@@ -61,7 +62,7 @@ public class ServerBuildIndexCompiler {
         @Override
         public MutationState execute() throws SQLException {
             connection.getMutationState().commitDDLFence(dataTable);
-            Tuple tuple = queryPlan.iterator().next();
+            Tuple tuple = plan.iterator().next();
             long rowCount = 0;
             if (tuple != null) {
                 Cell kv = tuple.getValue(0);
@@ -77,57 +78,61 @@ public class ServerBuildIndexCompiler {
 
         @Override
         public QueryPlan getQueryPlan() {
-            return queryPlan;
+            return plan;
         }
     };
     
-    public ServerBuildIndexCompiler(PhoenixConnection connection, String dataTableFullName, String indexTableFullName) {
+    public ServerBuildIndexCompiler(PhoenixConnection connection, String tableName) {
         this.connection = connection;
-        this.dataTableFullName = dataTableFullName;
-        this.indexTableFullName = indexTableFullName;
+        this.tableName = tableName;
     }
 
-    public MutationPlan compile() throws SQLException {
-        PTable index = PhoenixRuntime.getTable(connection, indexTableFullName);
-        dataTable = PhoenixRuntime.getTable(connection, dataTableFullName);
-        if (index.getIndexType() == PTable.IndexType.GLOBAL &&  dataTable.isTransactional()) {
-            throw new IllegalArgumentException(
-                    "ServerBuildIndexCompiler does not support global indexes on transactional tables");
-        }
-        PostDDLCompiler compiler = new PostDDLCompiler(connection);
-        TableRef dataTableRef = new TableRef(dataTable);
-        compiler.compile(Collections.singletonList(dataTableRef),
-                null, null, null, HConstants.LATEST_TIMESTAMP);
-        queryPlan = compiler.getQueryPlan(dataTableRef);
-        Scan dataTableScan = IndexManagementUtil.newLocalStateScan(queryPlan.getContext().getScan(),
-                Collections.singletonList(index.getIndexMaintainer(dataTable, connection)));
-        ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
-        IndexMaintainer.serializeAdditional(dataTable, indexMetaDataPtr, Collections.singletonList(index),
-                connection);
-        byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
+    public MutationPlan compile(PTable index) throws SQLException {
+        try (final PhoenixStatement statement = new PhoenixStatement(connection)) {
+            String query = "SELECT count(*) FROM " + tableName;
+            this.plan = statement.compileQuery(query);
+            TableRef tableRef = plan.getTableRef();
+            Scan scan = plan.getContext().getScan();
+            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+            dataTable = tableRef.getTable();
+            if (index.getIndexType() == PTable.IndexType.GLOBAL &&  dataTable.isTransactional()) {
+                throw new IllegalArgumentException(
+                        "ServerBuildIndexCompiler does not support global indexes on transactional tables");
+            }
+            IndexMaintainer.serialize(dataTable, ptr, Collections.singletonList(index), plan.getContext().getConnection());
+            // Set the scan attributes that UngroupedAggregateRegionObserver will switch on.
+            // For local indexes, the BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO attribute, and
+            // for global indexes PhoenixIndexCodec.INDEX_PROTO_MD attribute is set to the serialized form of index
+            // metadata to build index rows from data table rows. For global indexes, we also need to set (1) the
+            // BaseScannerRegionObserver.REBUILD_INDEXES attribute in order to signal UngroupedAggregateRegionObserver
+            // that this scan is for building global indexes and (2) the MetaDataProtocol.PHOENIX_VERSION attribute
+            // that will be passed as a mutation attribute for the scanned mutations that will be applied on
+            // the index table possibly remotely
+            if (index.getIndexType() == PTable.IndexType.LOCAL) {
+                scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO, ByteUtil.copyKeyBytesIfNecessary(ptr));
+            } else {
+                scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ByteUtil.copyKeyBytesIfNecessary(ptr));
+                scan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
+                ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION);
+            }
+            // By default, we'd use a FirstKeyOnly filter as nothing else needs to be projected for count(*).
+            // However, in this case, we need to project all of the data columns that contribute to the index.
+            IndexMaintainer indexMaintainer = index.getIndexMaintainer(dataTable, connection);
+            for (ColumnReference columnRef : indexMaintainer.getAllColumns()) {
+                if (index.getImmutableStorageScheme() == PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
+                    scan.addFamily(columnRef.getFamily());
+                } else {
+                    scan.addColumn(columnRef.getFamily(), columnRef.getQualifier());
+                }
+            }
 
-        // Set the scan attributes that UngroupedAggregateRegionObserver will switch on.
-        // For local indexes, the BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO attribute, and
-        // for global indexes PhoenixIndexCodec.INDEX_PROTO_MD attribute is set to the serialized form of index
-        // metadata to build index rows from data table rows. For global indexes, we also need to set (1) the
-        // BaseScannerRegionObserver.REBUILD_INDEXES attribute in order to signal UngroupedAggregateRegionObserver
-        // that this scan is for building global indexes and (2) the MetaDataProtocol.PHOENIX_VERSION attribute
-        // that will be passed as a mutation attribute for the scanned mutations that will be applied on
-        // the index table possibly remotely
-        ScanUtil.setClientVersion(dataTableScan, MetaDataProtocol.PHOENIX_VERSION);
-        if (index.getIndexType() == PTable.IndexType.LOCAL) {
-            dataTableScan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO, attribValue);
-        } else {
-            dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue);
-            dataTableScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
-            ScanUtil.setClientVersion(dataTableScan, MetaDataProtocol.PHOENIX_VERSION);
-        }
-        if (dataTable.isTransactional()) {
-            dataTableScan.setAttribute(BaseScannerRegionObserver.TX_STATE, connection.getMutationState().encodeTransaction());
-        }
+            if (dataTable.isTransactional()) {
+                scan.setAttribute(BaseScannerRegionObserver.TX_STATE, connection.getMutationState().encodeTransaction());
+            }
 
-        // Go through MutationPlan abstraction so that we can create local indexes
-        // with a connectionless connection (which makes testing easier).
-        return new RowCountMutationPlan(queryPlan.getContext(), PhoenixStatement.Operation.UPSERT);
+            // Go through MutationPlan abstraction so that we can create local indexes
+            // with a connectionless connection (which makes testing easier).
+            return new RowCountMutationPlan(plan.getContext(), PhoenixStatement.Operation.UPSERT);
+        }
     }
 }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index d631b84..1b3b9c3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -299,11 +299,6 @@ public class GlobalIndexChecker extends BaseRegionObserver {
                 buildIndexScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD));
                 buildIndexScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
                 buildIndexScan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true));
-                // We want delete markers to be replayed during index rebuild.
-                buildIndexScan.setRaw(true);
-                buildIndexScan.setCacheBlocks(false);
-                buildIndexScan.setMaxVersions();
-                buildIndexScan.setTimeRange(0, maxTimestamp);
             }
             // Rebuild the index row from the corresponding the row in the the data table
             // Get the data row key from the index row key
@@ -311,6 +306,9 @@ public class GlobalIndexChecker extends BaseRegionObserver {
             buildIndexScan.withStartRow(dataRowKey, true);
             buildIndexScan.withStopRow(dataRowKey, true);
             buildIndexScan.setTimeRange(0, maxTimestamp);
+            // If the data table row has been deleted then we want to delete the corresponding index row too.
+            // Thus, we are using a raw scan
+            buildIndexScan.setRaw(true);
             try (ResultScanner resultScanner = dataHTable.getScanner(buildIndexScan)){
                 resultScanner.next();
             } catch (Throwable t) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
index 5128c26..1abcef4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
@@ -82,16 +82,18 @@ public class PhoenixServerBuildIndexInputFormat<T extends DBWritable> extends Ph
         try (final Connection connection = ConnectionUtil.getInputConnection(configuration, overridingProps)) {
             PhoenixConnection phoenixConnection = connection.unwrap(PhoenixConnection.class);
             Long scn = (currentScnValue != null) ? Long.valueOf(currentScnValue) : EnvironmentEdgeManager.currentTimeMillis();
+            PTable indexTable = PhoenixRuntime.getTableNoCache(phoenixConnection, indexTableFullName);
             ServerBuildIndexCompiler compiler =
-                    new ServerBuildIndexCompiler(phoenixConnection, dataTableFullName, indexTableFullName);
-            MutationPlan plan = compiler.compile();
-            queryPlan = plan.getQueryPlan();
-            Scan scan = queryPlan.getContext().getScan();
+                    new ServerBuildIndexCompiler(phoenixConnection, dataTableFullName);
+            MutationPlan plan = compiler.compile(indexTable);
+            Scan scan = plan.getContext().getScan();
+
             try {
                 scan.setTimeRange(0, scn);
             } catch (IOException e) {
                 throw new SQLException(e);
             }
+            queryPlan = plan.getQueryPlan();
             // since we can't set a scn on connections with txn set TX_SCN attribute so that the max time range is set by BaseScannerRegionObserver
             if (txnScnValue != null) {
                 scan.setAttribute(BaseScannerRegionObserver.TX_SCN, Bytes.toBytes(Long.valueOf(txnScnValue)));
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 3c823d5..d290333 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1301,10 +1301,8 @@ public class MetaDataClient {
             PostIndexDDLCompiler compiler = new PostIndexDDLCompiler(connection, dataTableRef);
             return compiler.compile(index);
         } else {
-            ServerBuildIndexCompiler compiler = new ServerBuildIndexCompiler(connection,
-                    SchemaUtil.getTableName(dataTableRef.getTable().getSchemaName().getString(), dataTableRef.getTable().getTableName().getString()),
-                    SchemaUtil.getTableName(index.getSchemaName().getString(), index.getTableName().getString()));
-            return compiler.compile();
+            ServerBuildIndexCompiler compiler = new ServerBuildIndexCompiler(connection, getFullTableName(dataTableRef));
+            return compiler.compile(index);
         }
     }
 
@@ -1674,6 +1672,9 @@ public class MetaDataClient {
             return buildIndexAtTimeStamp(table, statement.getTable());
         }
 
+        String dataTableFullName = SchemaUtil.getTableName(
+                tableRef.getTable().getSchemaName().getString(),
+                tableRef.getTable().getTableName().getString());
         return buildIndex(table, tableRef);
     }