You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by an...@apache.org on 2016/10/21 14:04:43 UTC

phoenix git commit: PHOENIX-3161 Check possibility of moving rebuilding code to coprocessor of data table.

Repository: phoenix
Updated Branches:
  refs/heads/master 5c9fb7b68 -> a95e8ab1a


PHOENIX-3161 Check possibility of moving rebuilding code to coprocessor of data table.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a95e8ab1
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a95e8ab1
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a95e8ab1

Branch: refs/heads/master
Commit: a95e8ab1af2b8defef3d8c0ed5a060c9b9881dd9
Parents: 5c9fb7b
Author: Ankit Singhal <an...@gmail.com>
Authored: Fri Oct 21 19:34:08 2016 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Fri Oct 21 19:34:08 2016 +0530

----------------------------------------------------------------------
 .../end2end/index/MutableIndexFailureIT.java    |  11 +-
 .../apache/phoenix/compile/PostDDLCompiler.java |   2 +-
 .../coprocessor/BaseScannerRegionObserver.java  |   1 +
 .../coprocessor/MetaDataRegionObserver.java     | 101 +++++++------------
 .../UngroupedAggregateRegionObserver.java       | 100 +++++++++++++++++-
 .../hbase/index/util/IndexManagementUtil.java   |  13 ++-
 .../java/org/apache/phoenix/util/ScanUtil.java  |   4 +
 7 files changed, 154 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/a95e8ab1/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index d6c1e9c..4263890 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -92,7 +92,7 @@ public class MutableIndexFailureIT extends BaseOwnClusterIT {
     public MutableIndexFailureIT(boolean transactional, boolean localIndex, boolean isNamespaceMapped) {
         this.transactional = transactional;
         this.localIndex = localIndex;
-        this.tableDDLOptions = transactional ? " TRANSACTIONAL=true " : "";
+        this.tableDDLOptions = " SALT_BUCKETS=2 " + (transactional ? ", TRANSACTIONAL=true " : "");
         this.tableName = (localIndex ? "L_" : "") + TestUtil.DEFAULT_DATA_TABLE_NAME + (transactional ? "_TXN" : "")
                 + (isNamespaceMapped ? "_NM" : "");
         this.indexName = INDEX_NAME;
@@ -180,8 +180,8 @@ public class MutableIndexFailureIT extends BaseOwnClusterIT {
             
             query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
             rs = conn.createStatement().executeQuery("EXPLAIN " + query);
-            String expectedPlan = "CLIENT PARALLEL 1-WAY FULL SCAN OVER "
-                    + SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), isNamespaceMapped);
+            String expectedPlan = "CLIENT PARALLEL 2-WAY FULL SCAN OVER "
+                    + SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), isNamespaceMapped)+"\nCLIENT MERGE SORT";
             assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs));
             rs = conn.createStatement().executeQuery(query);
             assertTrue(rs.next());
@@ -232,8 +232,8 @@ public class MutableIndexFailureIT extends BaseOwnClusterIT {
                 // Verify previous writes succeeded to data table
                 query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
                 rs = conn.createStatement().executeQuery("EXPLAIN " + query);
-                expectedPlan = "CLIENT PARALLEL 1-WAY FULL SCAN OVER "
-                        + SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), isNamespaceMapped);
+                expectedPlan = "CLIENT PARALLEL 2-WAY FULL SCAN OVER "
+                        + SchemaUtil.getPhysicalTableName(fullTableName.getBytes(), isNamespaceMapped)+"\nCLIENT MERGE SORT";
                 assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs));
                 rs = conn.createStatement().executeQuery(query);
                 assertTrue(rs.next());
@@ -254,6 +254,7 @@ public class MutableIndexFailureIT extends BaseOwnClusterIT {
             // re-enable index table
             FAIL_WRITE = false;
             waitForIndexToBeActive(conn,indexName);
+            waitForIndexToBeActive(conn,indexName+"_2");
             waitForIndexToBeActive(conn,secondIndexName);
 
             // Verify UPSERT on data table still work after index table is recreated

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a95e8ab1/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
index 004e254..393499a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
@@ -210,7 +210,7 @@ public class PostDDLCompiler {
                         if (ts!=HConstants.LATEST_TIMESTAMP && tableRef.getTable().isTransactional()) {
                             ts = TransactionUtil.convertToNanoseconds(ts);
                         }
-                        ScanUtil.setTimeRange(scan, ts);
+                        ScanUtil.setTimeRange(scan, scan.getTimeRange().getMin(), ts);
                         if (emptyCF != null) {
                             scan.setAttribute(BaseScannerRegionObserver.EMPTY_CF, emptyCF);
                         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a95e8ab1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 4fa1399..f6bd512 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -88,6 +88,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     public static final String EXPECTED_UPPER_REGION_KEY = "_ExpectedUpperRegionKey";
     public static final String REVERSE_SCAN = "_ReverseScan";
     public static final String ANALYZE_TABLE = "_ANALYZETABLE";
+    public static final String REBUILD_INDEXES = "_RebuildIndexes";
     public static final String TX_STATE = "_TxState";
     public static final String GUIDEPOST_WIDTH_BYTES = "_GUIDEPOST_WIDTH_BYTES";
     public static final String GUIDEPOST_PER_REGION = "_GUIDEPOST_PER_REGION";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a95e8ab1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index c645cf4..e790b59 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -17,8 +17,9 @@
  */
 package org.apache.phoenix.coprocessor;
 
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+
 import java.io.IOException;
-import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -33,16 +34,12 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -55,19 +52,19 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.phoenix.cache.GlobalCache;
-import org.apache.phoenix.cache.ServerCacheClient;
+import org.apache.phoenix.compile.MutationPlan;
+import org.apache.phoenix.compile.PostDDLCompiler;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixDriver;
-import org.apache.phoenix.parse.AlterIndexStatement;
-import org.apache.phoenix.parse.NamedTableNode;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.MetaDataClient;
@@ -75,7 +72,7 @@ import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableNotFoundException;
-import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.MetaDataUtil;
@@ -281,6 +278,11 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
                     	// Set SCN so that we don't ping server and have the upper bound set back to
                     	// the timestamp when the failure occurred.
                     	props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(Long.MAX_VALUE));
+                    	
+                    	//Set timeout to max value as rebuilding may take time
+                    	props.setProperty(QueryServices.THREAD_TIMEOUT_MS_ATTRIB, Long.toString(Long.MAX_VALUE));
+                    	props.setProperty(QueryServices.HBASE_CLIENT_SCANNER_TIMEOUT_ATTRIB, Long.toString(Long.MAX_VALUE));
+                    	props.setProperty(QueryServices.RPC_TIMEOUT_ATTRIB, Long.toString(Long.MAX_VALUE));
                     	// don't run a second index populations upsert select 
                         props.setProperty(QueryServices.INDEX_POPULATION_SLEEP_TIME, "0"); 
                         conn = QueryUtil.getConnectionOnServer(props, env.getConfiguration()).unwrap(PhoenixConnection.class);
@@ -305,7 +307,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
                     // Allow index to begin incremental maintenance as index is back online and we
                     // cannot transition directly from DISABLED -> ACTIVE
                     if (Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexState) == 0) {
-                        updateIndexState(indexTableFullName, env, PIndexState.DISABLE, PIndexState.INACTIVE);
+                        updateIndexState(conn, indexTableFullName, env, PIndexState.DISABLE, PIndexState.INACTIVE);
                     }
                     List<PTable> indexesToPartiallyRebuild = dataTableToIndexesMap.get(dataPTable);
                     if (indexesToPartiallyRebuild == null) {
@@ -344,62 +346,31 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
                             long timeStamp = Math.max(0, earliestDisableTimestamp - overlapTime);
                             LOG.info("Starting to build " + dataPTable + " indexes " + indexesToPartiallyRebuild
                                     + " from timestamp=" + timeStamp);
-                            Scan dataTableScan = IndexManagementUtil.newLocalStateScan(maintainers);
+                            TableRef tableRef = new TableRef(null, dataPTable, HConstants.LATEST_TIMESTAMP, false);
+                            // TODO Need to set high timeout 
+                            PostDDLCompiler compiler = new PostDDLCompiler(conn);
+                            MutationPlan plan = compiler.compile(Collections.singletonList(tableRef), null, null, null,
+                                    HConstants.LATEST_TIMESTAMP);
+                            Scan dataTableScan = IndexManagementUtil.newLocalStateScan(plan.getContext().getScan(),
+                                    maintainers);
                             dataTableScan.setTimeRange(timeStamp, HConstants.LATEST_TIMESTAMP);
-                            byte[] physicalTableName = dataPTable.getPhysicalName().getBytes();
-                            try (HTableInterface dataHTable = conn.getQueryServices().getTable(physicalTableName)) {
-                                Result result;
-                                try (ResultScanner dataTableScanner = dataHTable.getScanner(dataTableScan)) {
-                                    int batchSize = conn.getMutateBatchSize();
-                                    List<Mutation> mutations = Lists.newArrayListWithExpectedSize(batchSize);
-                                    ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(
-                                            ByteUtil.EMPTY_BYTE_ARRAY);
-                                    IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr,
-                                            indexesToPartiallyRebuild, conn);
-                                    byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
-                                    byte[] uuidValue = ServerCacheClient.generateId();
+                            dataTableScan.setCacheBlocks(false);
+                            dataTableScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES);
+                            
+                            ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(
+                                    ByteUtil.EMPTY_BYTE_ARRAY);
+                            IndexMaintainer.serializeAdditional(dataPTable, indexMetaDataPtr, indexesToPartiallyRebuild,
+                                    conn);
+                            byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
 
-                                    while ((result = dataTableScanner.next()) != null && !result.isEmpty()) {
-                                        Put put = null;
-                                        Delete del = null;
-                                        for (Cell cell : result.rawCells()) {
-                                            if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
-                                                if (put == null) {
-                                                    put = new Put(CellUtil.cloneRow(cell));
-                                                    put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                                                    put.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
-                                                    put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS,
-                                                            PDataType.TRUE_BYTES);
-                                                    mutations.add(put);
-                                                }
-                                                put.add(cell);
-                                            } else {
-                                                if (del == null) {
-                                                    del = new Delete(CellUtil.cloneRow(cell));
-                                                    del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                                                    del.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
-                                                    del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS,
-                                                            PDataType.TRUE_BYTES);
-                                                    mutations.add(del);
-                                                }
-                                                del.addDeleteMarker(cell);
-                                            }
-                                        }
-                                        if (mutations.size() == batchSize) {
-                                            dataHTable.batch(mutations);
-                                            uuidValue = ServerCacheClient.generateId();
-                                            mutations.clear();
-                                        }
-                                    }
-                                    if (!mutations.isEmpty()) {
-                                        dataHTable.batch(mutations);
-                                    }
-                                }
-                            }
+                            dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+                            MutationState mutationState = plan.execute();
+                            long rowCount = mutationState.getUpdateCount();
+                            LOG.info(rowCount + " rows of index which are rebuild");
                             for (PTable indexPTable : indexesToPartiallyRebuild) {
                                 String indexTableFullName = SchemaUtil.getTableName(indexPTable.getSchemaName()
                                         .getString(), indexPTable.getTableName().getString());
-                                updateIndexState(indexTableFullName, env, PIndexState.INACTIVE, PIndexState.ACTIVE);
+                                updateIndexState(conn, indexTableFullName, env, PIndexState.INACTIVE, PIndexState.ACTIVE);
                             }
                         } catch (Exception e) { // Log, but try next table's indexes
                             LOG.warn("Unable to rebuild " + dataPTable + " indexes " + indexesToPartiallyRebuild
@@ -429,11 +400,11 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
         }
     }
     
-    private static void updateIndexState(String indexTableName, RegionCoprocessorEnvironment env, PIndexState oldState,
+    private static void updateIndexState(PhoenixConnection conn, String indexTableName, RegionCoprocessorEnvironment env, PIndexState oldState,
             PIndexState newState) throws ServiceException, Throwable {
         byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName);
         String schemaName = SchemaUtil.getSchemaNameFromFullName(indexTableName);
-        String indexName = SchemaUtil.getSchemaNameFromFullName(indexTableName);
+        String indexName = SchemaUtil.getTableNameFromFullName(indexTableName);
         // Mimic the Put that gets generated by the client on an update of the index state
         Put put = new Put(indexTableKey);
         put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
@@ -443,9 +414,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver {
                     PLong.INSTANCE.toBytes(0));
         }
         final List<Mutation> tableMetadata = Collections.<Mutation> singletonList(put);
-        Connection conn = QueryUtil.getConnection(env.getConfiguration());
-        MetaDataMutationResult result = conn.unwrap(PhoenixConnection.class).getQueryServices()
-                .updateIndexState(tableMetadata, null);
+        MetaDataMutationResult result = conn.getQueryServices().updateIndexState(tableMetadata, null);
         MutationCode code = result.getMutationCode();
         if (code == MutationCode.TABLE_NOT_FOUND) { throw new TableNotFoundException(schemaName, indexName); }
         if (code == MutationCode.UNALLOWED_TABLE_MUTATION) { throw new SQLExceptionInfo.Builder(

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a95e8ab1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 0f175c5..38f7253 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -43,6 +43,7 @@ import javax.annotation.concurrent.GuardedBy;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
@@ -66,6 +67,7 @@ import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.coprocessor.generated.PTableProtos;
 import org.apache.phoenix.exception.DataExceedsCapacityException;
 import org.apache.phoenix.execute.TupleProjector;
@@ -263,7 +265,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                     env, region.getRegionInfo().getTable().getNameAsString(), ts,
                     gp_width_bytes, gp_per_region_bytes);
             return collectStats(s, statsCollector, region, scan, env.getConfiguration());
-        }
+        } else if (ScanUtil.isIndexRebuild(scan)) { return rebuildIndices(s, region, scan, env.getConfiguration()); }
         int offsetToBe = 0;
         if (localIndexScan) {
             /*
@@ -725,6 +727,102 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         }
     }
     
+    private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan,
+            Configuration config) throws IOException {
+        byte[] indexMetaData = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+        boolean hasMore;
+        long rowCount = 0;
+        try {
+            int batchSize = config.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+            List<Mutation> mutations = Lists.newArrayListWithExpectedSize(batchSize);
+            region.startRegionOperation();
+            byte[] uuidValue = ServerCacheClient.generateId();
+            synchronized (innerScanner) {
+                do {
+                    List<Cell> results = new ArrayList<Cell>();
+                    hasMore = innerScanner.nextRaw(results);
+                    if (!results.isEmpty()) {
+                        Put put = null;
+                        Delete del = null;
+                        for (Cell cell : results) {
+
+                            if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) {
+                                if (put == null) {
+                                    put = new Put(CellUtil.cloneRow(cell));
+                                    put.setAttribute(PhoenixIndexCodec.INDEX_MD, indexMetaData);
+                                    put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+                                    put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS,
+                                            PDataType.TRUE_BYTES);
+                                    mutations.add(put);
+                                }
+                                put.add(cell);
+                            } else {
+                                if (del == null) {
+                                    del = new Delete(CellUtil.cloneRow(cell));
+                                    del.setAttribute(PhoenixIndexCodec.INDEX_MD, indexMetaData);
+                                    del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+                                    del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS,
+                                            PDataType.TRUE_BYTES);
+                                    mutations.add(del);
+                                }
+                                del.addDeleteMarker(cell);
+                            }
+                        }
+                        if (mutations.size() >= batchSize) {
+                            region.batchMutate(mutations.toArray(new Mutation[mutations.size()]), HConstants.NO_NONCE,
+                                    HConstants.NO_NONCE);
+                            uuidValue = ServerCacheClient.generateId();
+                            mutations.clear();
+                        }
+                        rowCount++;
+                    }
+                    
+                } while (hasMore);
+                if (!mutations.isEmpty()) {
+                    region.batchMutate(mutations.toArray(new Mutation[mutations.size()]), HConstants.NO_NONCE,
+                            HConstants.NO_NONCE);
+                }
+            }
+        } catch (IOException e) {
+            logger.error("IOException during rebuilding: " + Throwables.getStackTraceAsString(e));
+            throw e;
+        } finally {
+            region.closeRegionOperation();
+        }
+        byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
+        final KeyValue aggKeyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
+                SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
+
+        RegionScanner scanner = new BaseRegionScanner(innerScanner) {
+            @Override
+            public HRegionInfo getRegionInfo() {
+                return region.getRegionInfo();
+            }
+
+            @Override
+            public boolean isFilterDone() {
+                return true;
+            }
+
+            @Override
+            public void close() throws IOException {
+                // no-op because we want to manage closing of the inner scanner ourselves.
+            }
+
+            @Override
+            public boolean next(List<Cell> results) throws IOException {
+                results.add(aggKeyValue);
+                return false;
+            }
+
+            @Override
+            public long getMaxResultSize() {
+                return scan.getMaxResultSize();
+            }
+        };
+        return scanner;
+    }
+    
     private RegionScanner collectStats(final RegionScanner innerScanner, StatisticsCollector stats,
             final Region region, final Scan scan, Configuration config) throws IOException {
         StatsCollectionCallable callable =

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a95e8ab1/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
index 4519145..c6642e7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
@@ -20,18 +20,14 @@ package org.apache.phoenix.hbase.index.util;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
-
-import com.google.common.collect.Maps;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.builder.IndexBuildingFailureException;
 import org.apache.phoenix.hbase.index.covered.data.LazyValueGetter;
@@ -150,7 +146,14 @@ public class IndexManagementUtil {
     }
 
     public static Scan newLocalStateScan(List<? extends Iterable<? extends ColumnReference>> refsArray) {
-        Scan s = new Scan();
+        return newLocalStateScan(null, refsArray);
+    }
+
+    public static Scan newLocalStateScan(Scan scan, List<? extends Iterable<? extends ColumnReference>> refsArray) {
+        Scan s = scan;
+        if (scan == null) {
+            s = new Scan();
+        }
         s.setRaw(true);
         // add the necessary columns to the scan
         for (Iterable<? extends ColumnReference> refs : refsArray) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a95e8ab1/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index b0e8a99..acaeb31 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -895,4 +895,8 @@ public class ScanUtil {
         return true;
     }
 
+    public static boolean isIndexRebuild(Scan scan) {
+        return scan.getAttribute((BaseScannerRegionObserver.REBUILD_INDEXES)) != null;
+    }
+
 }
\ No newline at end of file