You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2020/11/14 01:59:13 UTC

[phoenix] branch master updated: PHOENIX-5998 Paged server side ungrouped aggregate operations

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 881dd8d  PHOENIX-5998 Paged server side ungrouped aggregate operations
881dd8d is described below

commit 881dd8d6033def07d0371cebbaa5635031594277
Author: Kadir Ozdemir <ko...@salesforce.com>
AuthorDate: Wed Nov 11 16:32:24 2020 -0800

    PHOENIX-5998 Paged server side ungrouped aggregate operations
---
 .../apache/phoenix/end2end/SpillableGroupByIT.java |   1 +
 .../org/apache/phoenix/end2end/UpsertSelectIT.java |   1 -
 .../coprocessor/BaseScannerRegionObserver.java     |   6 +-
 .../UngroupedAggregateRegionObserver.java          | 719 ++++-----------------
 .../UngroupedAggregateRegionScanner.java           | 670 +++++++++++++++++++
 .../phoenix/iterate/TableResultIterator.java       |   4 +
 .../UngroupedAggregatingResultIterator.java        |  43 +-
 .../org/apache/phoenix/query/QueryServices.java    |   2 +
 .../apache/phoenix/query/QueryServicesOptions.java |   3 +-
 .../java/org/apache/phoenix/query/BaseTest.java    |   9 +-
 10 files changed, 838 insertions(+), 620 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java
index d7e1b37..c6692f8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java
@@ -80,6 +80,7 @@ public class SpillableGroupByIT extends BaseOwnClusterIT {
         props.put(QueryServices.STATS_COLLECTION_ENABLED, Boolean.toString(false));
         props.put(QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB, Boolean.TRUE.toString());
         props.put(QueryServices.EXPLAIN_ROW_COUNT_ATTRIB, Boolean.TRUE.toString());
+        props.put(QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS, Long.toString(1000));
         // Must update config before starting server
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
index c55e9b7..007d1ef 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
@@ -1680,7 +1680,6 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
             fail();
         } catch (SQLException e) {
             assertEquals(SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY.getErrorCode(), e.getErrorCode());
-            assertFalse(e.getMessage().contains(invalidValue));
             assertTrue(e.getMessage().contains(columnTypeInfo));
         }
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 712f719..423d0d3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -74,9 +74,13 @@ abstract public class BaseScannerRegionObserver extends CompatBaseScannerRegionO
     public static final String GROUP_BY_LIMIT = "_GroupByLimit";
     public static final String LOCAL_INDEX = "_LocalIndex";
     public static final String LOCAL_INDEX_BUILD = "_LocalIndexBuild";
-    // The number of index rows to be rebuild in one RPC call
     public static final String INDEX_REBUILD_PAGING = "_IndexRebuildPaging";
+    // The number of index rows to be rebuild in one RPC call
     public static final String INDEX_REBUILD_PAGE_ROWS = "_IndexRebuildPageRows";
+    public static final String SERVER_PAGING = "_ServerPaging";
+    // The number of rows to be scanned in one RPC call
+    public static final String AGGREGATE_PAGE_SIZE_IN_MS = "_AggregatePageSizeInMs";
+
     // Index verification type done by the index tool
     public static final String INDEX_REBUILD_VERIFY_TYPE = "_IndexRebuildVerifyType";
     public static final String INDEX_RETRY_VERIFY = "_IndexRetryVerify";
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 2d58c69..c8fd915 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -21,9 +21,6 @@ import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
-import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
-import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
-import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
 import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.COMPACTION_UPDATE_STATS_ROW_COUNT;
 import static org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker.CONCURRENT_UPDATE_STATS_ROW_COUNT;
 
@@ -35,7 +32,6 @@ import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.sql.SQLException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
@@ -43,12 +39,8 @@ import java.util.concurrent.Callable;
 
 import javax.annotation.concurrent.GuardedBy;
 
-import org.apache.phoenix.index.GlobalIndexChecker;
-import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellBuilderFactory;
-import org.apache.hadoop.hbase.CellBuilderType;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
@@ -56,11 +48,7 @@ import org.apache.hadoop.hbase.KeepDeletedCells;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
@@ -86,27 +74,16 @@ import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.WritableUtils;
-import org.apache.phoenix.cache.GlobalCache;
-import org.apache.phoenix.cache.TenantCache;
 import org.apache.phoenix.coprocessor.generated.PTableProtos;
-import org.apache.phoenix.exception.DataExceedsCapacityException;
 import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ExpressionType;
-import org.apache.phoenix.expression.aggregator.Aggregator;
-import org.apache.phoenix.expression.aggregator.Aggregators;
-import org.apache.phoenix.expression.aggregator.ServerAggregators;
 import org.apache.phoenix.filter.AllVersionsIndexRebuildFilter;
-import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.hbase.index.Indexer;
-import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.exception.IndexWriteException;
-import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.index.GlobalIndexChecker;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
@@ -116,47 +93,26 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.mapreduce.index.IndexTool;
-import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
-import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.PRow;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
-import org.apache.phoenix.schema.RowKeySchema;
-import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableNotFoundException;
-import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.stats.NoOpStatisticsCollector;
 import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker;
 import org.apache.phoenix.schema.stats.StatisticsCollector;
 import org.apache.phoenix.schema.stats.StatisticsCollectorFactory;
 
 import org.apache.phoenix.schema.stats.StatsCollectionDisabledOnServerException;
-import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
-import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
-import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
-import org.apache.phoenix.schema.tuple.Tuple;
-import org.apache.phoenix.schema.types.PBinary;
-import org.apache.phoenix.schema.types.PChar;
 import org.apache.phoenix.schema.types.PDataType;
-import org.apache.phoenix.schema.types.PDouble;
-import org.apache.phoenix.schema.types.PFloat;
 import org.apache.phoenix.schema.types.PLong;
-import org.apache.phoenix.transaction.PhoenixTransactionContext;
-import org.apache.phoenix.transaction.PhoenixTransactionProvider;
-import org.apache.phoenix.transaction.TransactionFactory;
-import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
-import org.apache.phoenix.util.ExpressionUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.PhoenixKeyValueUtil;
-import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
@@ -165,15 +121,11 @@ import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.ServerUtil.ConnectionType;
-import org.apache.phoenix.util.StringUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.phoenix.thirdparty.com.google.common.base.Throwables;
-import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
-import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
-import org.apache.phoenix.thirdparty.com.google.common.primitives.Ints;
-
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
 
 /**
  * Region observer that aggregates ungrouped rows(i.e. SQL query with aggregation function and no GROUP BY).
@@ -220,7 +172,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
     @GuardedBy("lock")
     private boolean isRegionClosingOrSplitting = false;
     private static final Logger LOGGER = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class);
-    private KeyValueBuilder kvBuilder;
     private Configuration upsertSelectConfig;
     private Configuration compactionConfig;
     private Configuration indexWriteConfig;
@@ -233,9 +184,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
 
     @Override
     public void start(CoprocessorEnvironment e) throws IOException {
-        // Can't use ClientKeyValueBuilder on server-side because the memstore expects to
-        // be able to get a single backing buffer for a KeyValue.
-        this.kvBuilder = GenericKeyValueBuilder.INSTANCE;
         /*
          * We need to create a copy of region's configuration since we don't want any side effect of
          * setting the RpcControllerFactory.
@@ -248,19 +196,45 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
          * priority handlers which could result in a deadlock.
          */
         upsertSelectConfig.setClass(RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY,
-            InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class);
+                InterRegionServerIndexRpcControllerFactory.class, RpcControllerFactory.class);
 
         compactionConfig = ServerUtil.getCompactionConfig(e.getConfiguration());
 
         // For retries of index write failures, use the same # of retries as the rebuilder
         indexWriteConfig = PropertiesUtil.cloneConfig(e.getConfiguration());
         indexWriteConfig.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
-            e.getConfiguration().getInt(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER,
-                QueryServicesOptions.DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER));
+                e.getConfiguration().getInt(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER,
+                        QueryServicesOptions.DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER));
         indexWriteProps = new ReadOnlyProps(indexWriteConfig.iterator());
     }
 
-    public void commitBatchWithRetries(final Region region, final List<Mutation> localRegionMutations, final long blockingMemstoreSize) throws IOException {
+    Configuration getUpsertSelectConfig() {
+        return upsertSelectConfig;
+    }
+
+    void incrementScansReferenceCount() throws IOException {
+        synchronized (lock) {
+            if (isRegionClosingOrSplitting) {
+                throw new IOException("Temporarily unable to write from scan because region is closing or splitting");
+            }
+            scansReferenceCount++;
+            lock.notifyAll();
+        }
+    }
+
+    void decrementScansReferenceCount() {
+        synchronized (lock) {
+            scansReferenceCount--;
+            if (scansReferenceCount < 0) {
+                LOGGER.warn(
+                        "Scan reference count went below zero. Something isn't correct. Resetting it back to zero");
+                scansReferenceCount = 0;
+            }
+            lock.notifyAll();
+        }
+    }
+
+    void commitBatchWithRetries(final Region region, final List<Mutation> localRegionMutations, final long blockingMemstoreSize) throws IOException {
         try {
             commitBatch(region, localRegionMutations, blockingMemstoreSize);
         } catch (IOException e) {
@@ -278,10 +252,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         }
     }
 
-    private void commitBatch(Region region, List<Mutation> mutations, long blockingMemstoreSize) throws IOException {
-      if (mutations.isEmpty()) {
-          return;
-      }
+    void commitBatch(Region region, List<Mutation> mutations, long blockingMemstoreSize) throws IOException {
+        if (mutations.isEmpty()) {
+            return;
+        }
 
        Mutation[] mutationArray = new Mutation[mutations.size()];
       // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the
@@ -300,9 +274,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
       region.batchMutate(mutations.toArray(mutationArray));
     }
 
-    private void setIndexAndTransactionProperties(List<Mutation> mutations, byte[] indexUUID,
-                                                  byte[] indexMaintainersPtr, byte[] txState,
-                                                  byte[] clientVersionBytes, boolean useIndexProto) {
+    static void setIndexAndTransactionProperties(List<Mutation> mutations, byte[] indexUUID,
+                                                 byte[] indexMaintainersPtr, byte[] txState,
+                                                 byte[] clientVersionBytes, boolean useIndexProto) {
         for (Mutation m : mutations) {
            if (indexMaintainersPtr != null) {
                m.setAttribute(useIndexProto ? PhoenixIndexCodec.INDEX_PROTO_MD : PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr);
@@ -320,9 +294,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
     }
 
     private void commitBatchWithTable(Table table, List<Mutation> mutations) throws IOException {
-      if (mutations.isEmpty()) {
-          return;
-      }
+        if (mutations.isEmpty()) {
+            return;
+        }
 
         LOGGER.debug("Committing batch of " + mutations.size() + " mutations for " + table);
         try {
@@ -340,9 +314,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
      * a high chance that flush might not proceed and memstore won't be freed up.
      * @throws IOException
      */
-    public void checkForRegionClosingOrSplitting() throws IOException {
+    void checkForRegionClosingOrSplitting() throws IOException {
         synchronized (lock) {
-            if(isRegionClosingOrSplitting) {
+            if (isRegionClosingOrSplitting) {
                 lock.notifyAll();
                 throw new IOException("Region is getting closed. Not allowing to write to avoid possible deadlock.");
             }
@@ -369,14 +343,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
 
     public static class MutationList extends ArrayList<Mutation> {
         private long byteSize = 0L;
+
         public MutationList() {
             super();
         }
-        
-        public MutationList(int size){
+
+        public MutationList(int size) {
             super(size);
         }
-        
+
         @Override
         public boolean add(Mutation e) {
             boolean r = super.add(e);
@@ -392,24 +367,25 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
 
         @Override
         public void clear() {
-            byteSize = 0L;
+            byteSize = 0l;
             super.clear();
         }
     }
 
-    public static long getBlockingMemstoreSize(Region region, Configuration conf) {
-       long flushSize = region.getTableDescriptor().getMemStoreFlushSize();
-
-       if (flushSize <= 0) {
-           flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
-               TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE);
-       }
-       return flushSize * (conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
-                   HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER)-1);
-   }
-    
+    static long getBlockingMemstoreSize(Region region, Configuration conf) {
+        long flushSize = region.getTableDescriptor().getMemStoreFlushSize();
+
+        if (flushSize <= 0) {
+            flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
+                    TableDescriptorBuilder.DEFAULT_MEMSTORE_FLUSH_SIZE);
+        }
+        return flushSize * (conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
+                HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER) - 1);
+    }
+
     @Override
-    protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException, SQLException {
+    protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan,
+                                              final RegionScanner s) throws IOException, SQLException {
         final RegionCoprocessorEnvironment env = c.getEnvironment();
         final Region region = env.getRegion();
         long ts = scan.getTimeRange().getMax();
@@ -436,39 +412,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                 }
             });
         }
-
-        PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
         boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan);
         int offsetToBe = 0;
         if (localIndexScan) {
-            /*
-             * For local indexes, we need to set an offset on row key expressions to skip
-             * the region start key.
-             */
             offsetToBe = region.getRegionInfo().getStartKey().length != 0 ? region.getRegionInfo().getStartKey().length :
-                region.getRegionInfo().getEndKey().length;
-            ScanUtil.setRowKeyOffset(scan, offsetToBe);
+                    region.getRegionInfo().getEndKey().length;
         }
         final int offset = offsetToBe;
-        
-        PTable projectedTable = null;
-        PTable writeToTable = null;
-        byte[][] values = null;
         byte[] descRowKeyTableBytes = scan.getAttribute(UPGRADE_DESC_ROW_KEY);
         boolean isDescRowKeyOrderUpgrade = descRowKeyTableBytes != null;
-        if (isDescRowKeyOrderUpgrade) {
-            LOGGER.debug("Upgrading row key for " + region.getRegionInfo().getTable().getNameAsString());
-            projectedTable = deserializeTable(descRowKeyTableBytes);
-            try {
-                writeToTable = PTableImpl.builderWithColumns(projectedTable,
-                        getColumnsToClone(projectedTable))
-                        .setRowKeyOrderOptimizable(true)
-                        .build();
-            } catch (SQLException e) {
-                ServerUtil.throwIOException("Upgrade failed", e); // Impossible
-            }
-            values = new byte[projectedTable.getPKColumns().size()][];
-        }
         boolean useProto = false;
         byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD_PROTO);
         useProto = localIndexBytes != null;
@@ -476,47 +428,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
         }
         List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes, useProto);
-        MutationList indexMutations = localIndexBytes == null ? new MutationList() : new MutationList(1024);
-        
         RegionScanner theScanner = s;
-        
-        byte[] replayMutations = scan.getAttribute(BaseScannerRegionObserver.REPLAY_WRITES);
-        byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID);
-        byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
-        byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
-        PhoenixTransactionProvider txnProvider = null;
-        if (txState != null) {
-            int clientVersion = clientVersionBytes == null ? ScanUtil.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes);
-            txnProvider = TransactionFactory.getTransactionProvider(txState, clientVersion);
-        }
-        List<Expression> selectExpressions = null;
         byte[] upsertSelectTable = scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE);
-        boolean isUpsert = false;
         boolean isDelete = false;
-        byte[] deleteCQ = null;
-        byte[] deleteCF = null;
-        byte[] emptyCF = null;
-        Connection targetHConn = null;
-        Table targetHTable = null;
-        boolean isPKChanging = false;
-        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-        if (upsertSelectTable != null) {
-            isUpsert = true;
-            projectedTable = deserializeTable(upsertSelectTable);
-            targetHConn = ConnectionFactory.createConnection(upsertSelectConfig);
-            targetHTable = targetHConn.getTable(
-                TableName.valueOf(projectedTable.getPhysicalName().getBytes()));
-            selectExpressions = deserializeExpressions(scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS));
-            values = new byte[projectedTable.getPKColumns().size()][];
-            isPKChanging = ExpressionUtil.isPkPositionChanging(new TableRef(projectedTable), selectExpressions);
-        } else {
+        if (upsertSelectTable == null) {
             byte[] isDeleteAgg = scan.getAttribute(BaseScannerRegionObserver.DELETE_AGG);
             isDelete = isDeleteAgg != null && Bytes.compareTo(PDataType.TRUE_BYTES, isDeleteAgg) == 0;
-            if (!isDelete) {
-                deleteCF = scan.getAttribute(BaseScannerRegionObserver.DELETE_CF);
-                deleteCQ = scan.getAttribute(BaseScannerRegionObserver.DELETE_CQ);
-            }
-            emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_CF);
         }
         TupleProjector tupleProjector = null;
         byte[][] viewConstants = null;
@@ -531,422 +448,18 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             }
             ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
             theScanner =
-                    getWrappedScanner(c, theScanner, offset, scan, dataColumns, tupleProjector, 
-                        region, indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr, useQualifierAsIndex);
-        } 
-        
-        if (j != null)  {
-            theScanner = new HashJoinRegionScanner(theScanner, p, j, ScanUtil.getTenantId(scan), env, useQualifierAsIndex, useNewValueColumnQualifier);
+                    getWrappedScanner(c, theScanner, offset, scan, dataColumns, tupleProjector,
+                            region, indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr, useQualifierAsIndex);
         }
-        
-        int maxBatchSize = 0;
-        long maxBatchSizeBytes = 0L;
-        MutationList mutations = new MutationList();
-        boolean needToWrite = false;
-        Configuration conf = env.getConfiguration();
-
-        /**
-         * Slow down the writes if the memstore size more than
-         * (hbase.hregion.memstore.block.multiplier - 1) times hbase.hregion.memstore.flush.size
-         * bytes. This avoids flush storm to hdfs for cases like index building where reads and
-         * write happen to all the table regions in the server.
-         */
-        final long blockingMemStoreSize = getBlockingMemstoreSize(region, conf) ;
 
-        boolean buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan;
-        if(buildLocalIndex) {
-            checkForLocalIndexColumnFamilies(region, indexMaintainers);
-        }
-        if (isDescRowKeyOrderUpgrade || isDelete || isUpsert
-                || (deleteCQ != null && deleteCF != null) || emptyCF != null || buildLocalIndex) {
-            needToWrite = true;
-            if((isUpsert && (targetHTable == null ||
-                    !targetHTable.getName().equals(region.getTableDescriptor().getTableName())))) {
-                needToWrite = false;
-            }
-            maxBatchSize = conf.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
-            mutations = new MutationList(Ints.saturatedCast(maxBatchSize + maxBatchSize / 10));
-            maxBatchSizeBytes = conf.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
-                QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
-        }
-        boolean hasMore;
-        int rowCount = 0;
-        boolean hasAny = false;
-        boolean acquiredLock = false;
-        boolean incrScanRefCount = false;
-        Aggregators aggregators = null;
-        Aggregator[] rowAggregators = null;
-        final RegionScanner innerScanner = theScanner;
-        final TenantCache tenantCache = GlobalCache.getTenantCache(env, ScanUtil.getTenantId(scan));
-        try (MemoryChunk em = tenantCache.getMemoryManager().allocate(0)) {
-            aggregators = ServerAggregators.deserialize(
-                    scan.getAttribute(BaseScannerRegionObserver.AGGREGATORS), conf, em);
-            rowAggregators = aggregators.getAggregators();
-            Pair<Integer, Integer> minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
-            Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug(LogUtil.addCustomAnnotations("Starting ungrouped coprocessor scan " + scan + " "+region.getRegionInfo(), ScanUtil.getCustomAnnotations(scan)));
-            }
-            boolean useIndexProto = true;
-            byte[] indexMaintainersPtr = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
-            // for backward compatiblity fall back to look by the old attribute
-            if (indexMaintainersPtr == null) {
-                indexMaintainersPtr = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
-                useIndexProto = false;
-            }
-    
-            if(needToWrite) {
-                synchronized (lock) {
-                    if (isRegionClosingOrSplitting) {
-                        throw new IOException("Temporarily unable to write from scan because region is closing or splitting");
-                    }
-                    scansReferenceCount++;
-                    incrScanRefCount = true;
-                    lock.notifyAll();
-                }
-            }
-            region.startRegionOperation();
-            acquiredLock = true;
-            synchronized (innerScanner) {
-                do {
-                    List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();
-                    // Results are potentially returned even when the return value of s.next is false
-                    // since this is an indication of whether or not there are more values after the
-                    // ones returned
-                    hasMore = innerScanner.nextRaw(results);
-                    if (!results.isEmpty()) {
-                        rowCount++;
-                        result.setKeyValues(results);
-                        if (isDescRowKeyOrderUpgrade) {
-                            Arrays.fill(values, null);
-                            Cell firstKV = results.get(0);
-                            RowKeySchema schema = projectedTable.getRowKeySchema();
-                            int maxOffset = schema.iterator(firstKV.getRowArray(), firstKV.getRowOffset() + offset, firstKV.getRowLength(), ptr);
-                            for (int i = 0; i < schema.getFieldCount(); i++) {
-                                Boolean hasValue = schema.next(ptr, i, maxOffset);
-                                if (hasValue == null) {
-                                    break;
-                                }
-                                Field field = schema.getField(i);
-                                if (field.getSortOrder() == SortOrder.DESC) {
-                                    // Special case for re-writing DESC ARRAY, as the actual byte value needs to change in this case
-                                    if (field.getDataType().isArrayType()) {
-                                        field.getDataType().coerceBytes(ptr, null, field.getDataType(),
-                                            field.getMaxLength(), field.getScale(), field.getSortOrder(), 
-                                            field.getMaxLength(), field.getScale(), field.getSortOrder(), true); // force to use correct separator byte
-                                    }
-                                    // Special case for re-writing DESC CHAR or DESC BINARY, to force the re-writing of trailing space characters
-                                    else if (field.getDataType() == PChar.INSTANCE || field.getDataType() == PBinary.INSTANCE) {
-                                        int len = ptr.getLength();
-                                        while (len > 0 && ptr.get()[ptr.getOffset() + len - 1] == StringUtil.SPACE_UTF8) {
-                                            len--;
-                                        }
-                                        ptr.set(ptr.get(), ptr.getOffset(), len);
-                                        // Special case for re-writing DESC FLOAT and DOUBLE, as they're not inverted like they should be (PHOENIX-2171)
-                                    } else if (field.getDataType() == PFloat.INSTANCE || field.getDataType() == PDouble.INSTANCE) {
-                                        byte[] invertedBytes = SortOrder.invert(ptr.get(), ptr.getOffset(), ptr.getLength());
-                                        ptr.set(invertedBytes);
-                                    }
-                                } else if (field.getDataType() == PBinary.INSTANCE) {
-                                    // Remove trailing space characters so that the setValues call below will replace them
-                                    // with the correct zero byte character. Note this is somewhat dangerous as these
-                                    // could be legit, but I don't know what the alternative is.
-                                    int len = ptr.getLength();
-                                    while (len > 0 && ptr.get()[ptr.getOffset() + len - 1] == StringUtil.SPACE_UTF8) {
-                                        len--;
-                                    }
-                                    ptr.set(ptr.get(), ptr.getOffset(), len);                                        
-                                }
-                                values[i] = ptr.copyBytes();
-                            }
-                            writeToTable.newKey(ptr, values);
-                            if (Bytes.compareTo(
-                                firstKV.getRowArray(), firstKV.getRowOffset() + offset, firstKV.getRowLength(), 
-                                ptr.get(),ptr.getOffset() + offset,ptr.getLength()) == 0) {
-                                continue;
-                            }
-                            byte[] newRow = ByteUtil.copyKeyBytesIfNecessary(ptr);
-                            if (offset > 0) { // for local indexes (prepend region start key)
-                                byte[] newRowWithOffset = new byte[offset + newRow.length];
-                                System.arraycopy(firstKV.getRowArray(), firstKV.getRowOffset(), newRowWithOffset, 0, offset);
-                                System.arraycopy(newRow, 0, newRowWithOffset, offset, newRow.length);
-                                newRow = newRowWithOffset;
-                            }
-                            byte[] oldRow = Bytes.copy(firstKV.getRowArray(), firstKV.getRowOffset(), firstKV.getRowLength());
-                            for (Cell cell : results) {
-                                // Copy existing cell but with new row key
-                                Cell newCell =
-                                    CellBuilderFactory.create(CellBuilderType.DEEP_COPY).
-                                        setRow(newRow).
-                                        setFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()).
-                                        setQualifier(cell.getQualifierArray(),
-                                            cell.getQualifierOffset(), cell.getQualifierLength()).
-                                        setTimestamp(cell.getTimestamp()).
-                                        setType(cell.getType()).setValue(cell.getValueArray(),
-                                        cell.getValueOffset(), cell.getValueLength()).build();
-                                switch (cell.getType()) {
-                                case Put:
-                                    // If Put, point delete old Put
-                                    Delete del = new Delete(oldRow);
-                                    Cell newDelCell =
-                                        CellBuilderFactory.create(CellBuilderType.DEEP_COPY).
-                                            setRow(newRow).
-                                            setFamily(cell.getFamilyArray(), cell.getFamilyOffset(),
-                                                cell.getFamilyLength()).
-                                            setQualifier(cell.getQualifierArray(),
-                                                cell.getQualifierOffset(), cell.getQualifierLength()).
-                                            setTimestamp(cell.getTimestamp()).
-                                            setType(Cell.Type.Delete).
-                                            setValue(ByteUtil.EMPTY_BYTE_ARRAY,
-                                            0, 0).build();
-                                    del.add(newDelCell);
-                                    mutations.add(del);
-
-                                    Put put = new Put(newRow);
-                                    put.add(newCell);
-                                    mutations.add(put);
-                                    break;
-                                case Delete:
-                                case DeleteColumn:
-                                case DeleteFamily:
-                                case DeleteFamilyVersion:
-                                    Delete delete = new Delete(newRow);
-                                    delete.add(newCell);
-                                    mutations.add(delete);
-                                    break;
-                                }
-                            }
-                        } else if (buildLocalIndex) {
-                            for (IndexMaintainer maintainer : indexMaintainers) {
-                                if (!results.isEmpty()) {
-                                    result.getKey(ptr);
-                                    ValueGetter valueGetter =
-                                            maintainer.createGetterFromKeyValues(
-                                                ImmutableBytesPtr.copyBytesIfNecessary(ptr),
-                                                results);
-                                    Put put = maintainer.buildUpdateMutation(kvBuilder,
-                                        valueGetter, ptr, results.get(0).getTimestamp(),
-                                        env.getRegion().getRegionInfo().getStartKey(),
-                                        env.getRegion().getRegionInfo().getEndKey());
-
-                                    if (txnProvider != null) {
-                                        put = txnProvider.markPutAsCommitted(put, ts, ts);
-                                    }
-                                    indexMutations.add(put);
-                                }
-                            }
-                            result.setKeyValues(results);
-                        } else if (isDelete) {
-                            // FIXME: the version of the Delete constructor without the lock
-                            // args was introduced in 0.94.4, thus if we try to use it here
-                            // we can no longer use the 0.94.2 version of the client.
-                            Cell firstKV = results.get(0);
-                            Delete delete = new Delete(firstKV.getRowArray(),
-                                firstKV.getRowOffset(), firstKV.getRowLength(),ts);
-                            if (replayMutations != null) {
-                                delete.setAttribute(REPLAY_WRITES, replayMutations);
-                            }
-                            mutations.add(delete);
-                            // force tephra to ignore this deletes
-                            delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
-                        } else if (isUpsert) {
-                            Arrays.fill(values, null);
-                            int bucketNumOffset = 0;
-                            if (projectedTable.getBucketNum() != null) {
-                                values[0] = new byte[] { 0 };
-                                bucketNumOffset = 1;
-                            }
-                            int i = bucketNumOffset;
-                            List<PColumn> projectedColumns = projectedTable.getColumns();
-                            for (; i < projectedTable.getPKColumns().size(); i++) {
-                                Expression expression = selectExpressions.get(i - bucketNumOffset);
-                                if (expression.evaluate(result, ptr)) {
-                                    values[i] = ptr.copyBytes();
-                                    // If SortOrder from expression in SELECT doesn't match the
-                                    // column being projected into then invert the bits.
-                                    if (expression.getSortOrder() !=
-                                            projectedColumns.get(i).getSortOrder()) {
-                                        SortOrder.invert(values[i], 0, values[i], 0,
-                                            values[i].length);
-                                    }
-                                }else{
-                                    values[i] = ByteUtil.EMPTY_BYTE_ARRAY;
-                                }
-                            }
-                            projectedTable.newKey(ptr, values);
-                            PRow row = projectedTable.newRow(kvBuilder, ts, ptr, false);
-                            for (; i < projectedColumns.size(); i++) {
-                                Expression expression = selectExpressions.get(i - bucketNumOffset);
-                                if (expression.evaluate(result, ptr)) {
-                                    PColumn column = projectedColumns.get(i);
-                                    if (!column.getDataType().isSizeCompatible(ptr, null,
-                                        expression.getDataType(), expression.getSortOrder(),
-                                        expression.getMaxLength(), expression.getScale(),
-                                        column.getMaxLength(), column.getScale())) {
-                                        throw new DataExceedsCapacityException(
-                                                column.getDataType(),
-                                                column.getMaxLength(),
-                                                column.getScale(),
-                                                column.getName().getString());
-                                    }
-                                    column.getDataType().coerceBytes(ptr, null,
-                                        expression.getDataType(), expression.getMaxLength(),
-                                        expression.getScale(), expression.getSortOrder(), 
-                                        column.getMaxLength(), column.getScale(),
-                                        column.getSortOrder(), projectedTable.rowKeyOrderOptimizable());
-                                    byte[] bytes = ByteUtil.copyKeyBytesIfNecessary(ptr);
-                                    row.setValue(column, bytes);
-                                }
-                            }
-                            for (Mutation mutation : row.toRowMutations()) {
-                                if (replayMutations != null) {
-                                    mutation.setAttribute(REPLAY_WRITES, replayMutations);
-                                } else if (txnProvider != null && projectedTable.getType() == PTableType.INDEX) {
-                                    mutation = txnProvider.markPutAsCommitted((Put)mutation, ts, ts);
-                                }
-                                mutations.add(mutation);
-                            }
-                            for (i = 0; i < selectExpressions.size(); i++) {
-                                selectExpressions.get(i).reset();
-                            }
-                        } else if (deleteCF != null && deleteCQ != null) {
-                            // No need to search for delete column, since we project only it
-                            // if no empty key value is being set
-                            if (emptyCF == null ||
-                                    result.getValue(deleteCF, deleteCQ) != null) {
-                                Delete delete = new Delete(results.get(0).getRowArray(),
-                                    results.get(0).getRowOffset(),
-                                    results.get(0).getRowLength());
-                                delete.addColumns(deleteCF,  deleteCQ, ts);
-                                // force tephra to ignore this deletes
-                                delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
-                                mutations.add(delete);
-                            }
-                        }
-                        if (emptyCF != null) {
-                            /*
-                             * If we've specified an emptyCF, then we need to insert an empty
-                             * key value "retroactively" for any key value that is visible at
-                             * the timestamp that the DDL was issued. Key values that are not
-                             * visible at this timestamp will not ever be projected up to
-                             * scans past this timestamp, so don't need to be considered.
-                             * We insert one empty key value per row per timestamp.
-                             */
-                            Set<Long> timeStamps =
-                                    Sets.newHashSetWithExpectedSize(results.size());
-                            for (Cell kv : results) {
-                                long kvts = kv.getTimestamp();
-                                if (!timeStamps.contains(kvts)) {
-                                    Put put = new Put(kv.getRowArray(), kv.getRowOffset(),
-                                        kv.getRowLength());
-                                    put.addColumn(emptyCF, QueryConstants.EMPTY_COLUMN_BYTES, kvts,
-                                        ByteUtil.EMPTY_BYTE_ARRAY);
-                                    mutations.add(put);
-                                }
-                            }
-                        }
-                        if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
-                            commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr,
-                                txState, targetHTable, useIndexProto, isPKChanging, clientVersionBytes);
-                            mutations.clear();
-                        }
-                        // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
-
-                        if (ServerUtil.readyToCommit(indexMutations.size(), indexMutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
-                            setIndexAndTransactionProperties(indexMutations, indexUUID, indexMaintainersPtr, txState, clientVersionBytes, useIndexProto);
-                            commitBatch(region, indexMutations, blockingMemStoreSize);
-                            indexMutations.clear();
-                        }
-                        aggregators.aggregate(rowAggregators, result);
-                        hasAny = true;
-                    }
-                } while (hasMore);
-                if (!mutations.isEmpty()) {
-                    commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState,
-                        targetHTable, useIndexProto, isPKChanging, clientVersionBytes);
-                    mutations.clear();
-                }
-
-                if (!indexMutations.isEmpty()) {
-                    commitBatch(region, indexMutations, blockingMemStoreSize);
-                    indexMutations.clear();
-                }
-            }
-        } finally {
-            if (needToWrite && incrScanRefCount) {
-                synchronized (lock) {
-                    scansReferenceCount--;
-                    if (scansReferenceCount < 0) {
-                        LOGGER.warn(
-                            "Scan reference count went below zero. Something isn't correct. Resetting it back to zero");
-                        scansReferenceCount = 0;
-                    }
-                    lock.notifyAll();
-                }
-            }
-            try {
-                if (targetHTable != null) {
-                    try {
-                        targetHTable.close();
-                    } catch (IOException e) {
-                        LOGGER.error("Closing table: " + targetHTable + " failed: ", e);
-                    }
-                }
-                if (targetHConn != null) {
-                    try {
-                        targetHConn.close();
-                    } catch (IOException e) {
-                        LOGGER.error("Closing connection: " + targetHConn + " failed: ", e);
-                    }
-                }
-            } finally {
-                try {
-                    innerScanner.close();
-                } finally {
-                    if (acquiredLock) region.closeRegionOperation();
-                }
-            }
-        }
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug(LogUtil.addCustomAnnotations("Finished scanning " + rowCount + " rows for ungrouped coprocessor scan " + scan, ScanUtil.getCustomAnnotations(scan)));
-        }
-
-        final boolean hadAny = hasAny;
-        Cell keyValue = null;
-        if (hadAny) {
-            byte[] value = aggregators.toBytes(rowAggregators);
-            keyValue = PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
-                SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length);
+        if (j != null) {
+            theScanner = new HashJoinRegionScanner(theScanner, p, j, ScanUtil.getTenantId(scan), env, useQualifierAsIndex, useNewValueColumnQualifier);
         }
-        final Cell aggKeyValue = keyValue;
-
-        RegionScanner scanner = new BaseRegionScanner(innerScanner) {
-            private boolean done = !hadAny;
-
-            @Override
-            public boolean isFilterDone() {
-                return done;
-            }
-
-            @Override
-            public boolean next(List<Cell> results) throws IOException {
-                if (done) return false;
-                done = true;
-                results.add(aggKeyValue);
-                return false;
-            }
-
-            @Override
-            public long getMaxResultSize() {
-                return scan.getMaxResultSize();
-            }
-        };
-        return scanner;
-
+        return new UngroupedAggregateRegionScanner(c, theScanner, region, scan, env, this);
     }
 
-    private void checkForLocalIndexColumnFamilies(Region region,
-            List<IndexMaintainer> indexMaintainers) throws IOException {
+    public static void checkForLocalIndexColumnFamilies(Region region,
+                                                        List<IndexMaintainer> indexMaintainers) throws IOException {
         TableDescriptor tableDesc = region.getTableDescriptor();
         String schemaName =
                 tableDesc.getTableName().getNamespaceAsString()
@@ -956,13 +469,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         String tableName = SchemaUtil.getTableNameFromFullName(tableDesc.getTableName().getNameAsString());
         for (IndexMaintainer indexMaintainer : indexMaintainers) {
             Set<ColumnReference> coveredColumns = indexMaintainer.getCoveredColumns();
-            if(coveredColumns.isEmpty()) {
+            if (coveredColumns.isEmpty()) {
                 byte[] localIndexCf = indexMaintainer.getEmptyKeyValueFamily().get();
                 // When covered columns empty we store index data in default column family so check for it.
                 if (tableDesc.getColumnFamily(localIndexCf) == null) {
                     ServerUtil.throwIOException("Column Family Not Found",
-                        new ColumnFamilyNotFoundException(schemaName, tableName, Bytes
-                                .toString(localIndexCf)));
+                            new ColumnFamilyNotFoundException(schemaName, tableName, Bytes
+                                    .toString(localIndexCf)));
                 }
             }
             for (ColumnReference reference : coveredColumns) {
@@ -970,21 +483,21 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                 ColumnFamilyDescriptor family = region.getTableDescriptor().getColumnFamily(cf);
                 if (family == null) {
                     ServerUtil.throwIOException("Column Family Not Found",
-                        new ColumnFamilyNotFoundException(schemaName, tableName, Bytes.toString(cf)));
+                            new ColumnFamilyNotFoundException(schemaName, tableName, Bytes.toString(cf)));
                 }
             }
         }
     }
 
-    private void commit(final Region region, List<Mutation> mutations, byte[] indexUUID, final long blockingMemStoreSize,
-            byte[] indexMaintainersPtr, byte[] txState, final Table targetHTable, boolean useIndexProto,
-                        boolean isPKChanging, byte[] clientVersionBytes)
+    void commit(final Region region, List<Mutation> mutations, byte[] indexUUID, final long blockingMemStoreSize,
+                byte[] indexMaintainersPtr, byte[] txState, final Table targetHTable, boolean useIndexProto,
+                boolean isPKChanging, byte[] clientVersionBytes)
             throws IOException {
         final List<Mutation> localRegionMutations = Lists.newArrayList();
         final List<Mutation> remoteRegionMutations = Lists.newArrayList();
         setIndexAndTransactionProperties(mutations, indexUUID, indexMaintainersPtr, txState, clientVersionBytes, useIndexProto);
         separateLocalAndRemoteMutations(targetHTable, region, mutations, localRegionMutations, remoteRegionMutations,
-            isPKChanging);
+                isPKChanging);
         commitBatchWithRetries(region, localRegionMutations, blockingMemStoreSize);
         try {
             commitBatchWithTable(targetHTable, remoteRegionMutations);
@@ -1006,7 +519,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
     }
 
     private void handleIndexWriteException(final List<Mutation> localRegionMutations, IOException origIOE,
-            MutateCommand mutateCommand) throws IOException {
+                                           MutateCommand mutateCommand) throws IOException {
         long serverTimestamp = ServerUtil.parseTimestampFromRemoteException(origIOE);
         SQLException inferredE = ServerUtil.parseLocalOrRemoteServerException(origIOE);
         if (inferredE != null && inferredE.getErrorCode() == SQLExceptionCode.INDEX_WRITE_FAILURE.getErrorCode()) {
@@ -1015,20 +528,20 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             for (Mutation mutation : localRegionMutations) {
                 if (PhoenixIndexMetaData.isIndexRebuild(mutation.getAttributesMap())) {
                     mutation.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
-                        BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
+                            BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
                 } else {
                     mutation.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
-                        BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES);
+                            BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES);
                 }
                 // use the server timestamp for index write retrys
                 PhoenixKeyValueUtil.setTimestamp(mutation, serverTimestamp);
             }
             IndexWriteException iwe = PhoenixIndexFailurePolicy.getIndexWriteException(inferredE);
             try (PhoenixConnection conn =
-                    QueryUtil.getConnectionOnServer(indexWriteConfig)
-                            .unwrap(PhoenixConnection.class)) {
+                         QueryUtil.getConnectionOnServer(indexWriteConfig)
+                                 .unwrap(PhoenixConnection.class)) {
                 PhoenixIndexFailurePolicy.doBatchWithRetries(mutateCommand, iwe, conn,
-                    indexWriteProps);
+                        indexWriteProps);
             } catch (Exception e) {
                 throw new DoNotRetryIOException(e);
             }
@@ -1039,14 +552,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
 
     private void separateLocalAndRemoteMutations(Table targetHTable, Region region, List<Mutation> mutations,
                                                  List<Mutation> localRegionMutations, List<Mutation> remoteRegionMutations,
-                                                 boolean isPKChanging){
+                                                 boolean isPKChanging) {
         boolean areMutationsInSameTable = areMutationsInSameTable(targetHTable, region);
         //if we're writing to the same table, but the PK can change, that means that some
         //mutations might be in our current region, and others in a different one.
         if (areMutationsInSameTable && isPKChanging) {
             RegionInfo regionInfo = region.getRegionInfo();
-            for (Mutation mutation : mutations){
-                if (regionInfo.containsRow(mutation.getRow())){
+            for (Mutation mutation : mutations) {
+                if (regionInfo.containsRow(mutation.getRow())) {
                     localRegionMutations.add(mutation);
                 } else {
                     remoteRegionMutations.add(mutation);
@@ -1066,9 +579,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
 
     @Override
     public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
-                           InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
-                           CompactionRequest request)
-                    throws IOException {
+                                      InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
+                                      CompactionRequest request) throws IOException {
         if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
             final TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
             // Compaction and split upcalls run with the effective user context of the requesting user.
@@ -1076,7 +588,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             // the login user. Switch to the login user context to ensure we have the expected
             // security context.
             return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() {
-                @Override public InternalScanner run() throws Exception {
+                @Override
+                public InternalScanner run() throws Exception {
                     InternalScanner internalScanner = scanner;
                     try {
                         long clientTimeStamp = EnvironmentEdgeManager.currentTimeMillis();
@@ -1084,8 +597,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                                 new DelegateRegionCoprocessorEnvironment(c.getEnvironment(),
                                         ConnectionType.COMPACTION_CONNECTION);
                         StatisticsCollector statisticsCollector = StatisticsCollectorFactory.createStatisticsCollector(
-                            compactionConfEnv, table.getNameAsString(), clientTimeStamp,
-                            store.getColumnFamilyDescriptor().getName());
+                                compactionConfEnv, table.getNameAsString(), clientTimeStamp,
+                                store.getColumnFamilyDescriptor().getName());
                         statisticsCollector.init();
                         internalScanner = statisticsCollector.createCompactionScanner(compactionConfEnv, store, internalScanner);
                     } catch (Exception e) {
@@ -1102,7 +615,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         return scanner;
     }
 
-    private static PTable deserializeTable(byte[] b) {
+    static PTable deserializeTable(byte[] b) {
         try {
             PTableProtos.PTable ptableProto = PTableProtos.PTable.parseFrom(b);
             return PTableImpl.createFromProto(ptableProto);
@@ -1119,19 +632,19 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         } else {
             if (region.getTableDescriptor().hasCoprocessor(GlobalIndexChecker.class.getCanonicalName())) {
                 return new IndexRepairRegionScanner(innerScanner, region, scan, env, this);
-            } else  {
+            } else {
                 return new IndexRebuildRegionScanner(innerScanner, region, scan, env, this);
             }
         }
-
     }
+
     private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan,
                                          final RegionCoprocessorEnvironment env) throws IOException {
         boolean oldCoproc = region.getTableDescriptor().hasCoprocessor(Indexer.class.getCanonicalName());
         byte[] valueBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_REBUILD_VERIFY_TYPE);
         IndexTool.IndexVerifyType verifyType = (valueBytes != null) ?
                 IndexTool.IndexVerifyType.fromValue(valueBytes) : IndexTool.IndexVerifyType.NONE;
-        if (oldCoproc  && verifyType == IndexTool.IndexVerifyType.ONLY) {
+        if (oldCoproc && verifyType == IndexTool.IndexVerifyType.ONLY) {
             return new IndexerRegionScanner(innerScanner, region, scan, env, this);
         }
         if (!scan.isRaw()) {
@@ -1158,9 +671,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         }
         return getRegionScanner(innerScanner, region, scan, env, oldCoproc);
     }
-    
+
     private RegionScanner collectStats(final RegionScanner innerScanner, StatisticsCollector stats,
-            final Region region, final Scan scan, Configuration config) throws IOException {
+                                       final Region region, final Scan scan, Configuration config) throws IOException {
         StatsCollectionCallable callable =
                 new StatsCollectionCallable(stats, region, innerScanner, config, scan);
         byte[] asyncBytes = scan.getAttribute(BaseScannerRegionObserver.RUN_UPDATE_STATS_ASYNC_ATTRIB);
@@ -1171,7 +684,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         long rowCount = 0; // in case of async, we report 0 as number of rows updated
         StatisticsCollectionRunTracker statsRunTracker =
                 StatisticsCollectionRunTracker.getInstance(config);
-        final boolean runUpdateStats = statsRunTracker.addUpdateStatsCommandRegion(region.getRegionInfo(),scan.getFamilyMap().keySet());
+        final boolean runUpdateStats = statsRunTracker.addUpdateStatsCommandRegion(region.getRegionInfo(), scan.getFamilyMap().keySet());
         if (runUpdateStats) {
             if (!async) {
                 rowCount = callable.call();
@@ -1186,7 +699,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         byte[] rowCountBytes = PLong.INSTANCE.toBytes(Long.valueOf(rowCount));
         final Cell aggKeyValue =
                 PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY,
-                    SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
+                        SINGLE_COLUMN, AGG_TIMESTAMP, rowCountBytes, 0, rowCountBytes.length);
         RegionScanner scanner = new BaseRegionScanner(innerScanner) {
             @Override
             public RegionInfo getRegionInfo() {
@@ -1236,7 +749,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         private final Scan scan;
 
         StatsCollectionCallable(StatisticsCollector s, Region r, RegionScanner rs,
-                Configuration config, Scan scan) {
+                                Configuration config, Scan scan) {
             this.statsCollector = s;
             this.region = r;
             this.innerScanner = rs;
@@ -1307,7 +820,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         }
     }
 
-    private static List<Expression> deserializeExpressions(byte[] b) {
+    static List<Expression> deserializeExpressions(byte[] b) {
         ByteArrayInputStream stream = new ByteArrayInputStream(b);
         try {
             DataInputStream input = new DataInputStream(stream);
@@ -1362,7 +875,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
     // the coprocessors to avoid dead lock scenario. See PHOENIX-3111.
     private void waitForScansToFinish(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException {
         int maxWaitTime = c.getEnvironment().getConfiguration().getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
-            HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
+                HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
         long start = EnvironmentEdgeManager.currentTimeMillis();
         synchronized (lock) {
             isRegionClosingOrSplitting = true;
@@ -1372,10 +885,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                     if (EnvironmentEdgeManager.currentTimeMillis() - start >= maxWaitTime) {
                         isRegionClosingOrSplitting = false; // must reset in case split is not retried
                         throw new IOException(String.format(
-                            "Operations like local index building/delete/upsert select"
-                                    + " might be going on so not allowing to split/close. scansReferenceCount=%s region=%s",
-                            scansReferenceCount,
-                            c.getEnvironment().getRegionInfo().getRegionNameAsString()));
+                                "Operations like local index building/delete/upsert select"
+                                        + " might be going on so not allowing to split/close. scansReferenceCount=%s region=%s",
+                                scansReferenceCount,
+                                c.getEnvironment().getRegionInfo().getRegionNameAsString()));
                     }
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
@@ -1386,7 +899,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
 
     @Override
     public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> c,
-            List<Pair<byte[], String>> familyPaths) throws IOException {
+                                 List<Pair<byte[], String>> familyPaths) throws IOException {
         // Don't allow bulkload if operations need read and write to same region are going on in the
         // the coprocessors to avoid dead lock scenario. See PHOENIX-3111.
         synchronized (lock) {
@@ -1416,6 +929,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         // This will lead to failure of cross cluster RPC if the effective user is not
         // the login user. Switch to the login user context to ensure we have the expected
         // security context.
+
         final String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
         // since we will make a call to syscat, do nothing if we are compacting syscat itself
         if (request.isMajor() && !PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME.equals(fullTableName)) {
@@ -1424,15 +938,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                 public Void run() throws Exception {
                     // If the index is disabled, keep the deleted cells so the rebuild doesn't corrupt the index
                     try (PhoenixConnection conn =
-                             QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class)) {
+                                 QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class)) {
                         PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName);
                         List<PTable> indexes = PTableType.INDEX.equals(table.getType()) ? Lists.newArrayList(table) : table.getIndexes();
                         // FIXME need to handle views and indexes on views as well
                         for (PTable index : indexes) {
                             if (index.getIndexDisableTimestamp() != 0) {
                                 LOGGER.info(
-                                    "Modifying major compaction scanner to retain deleted cells for a table with disabled index: "
-                                        + fullTableName);
+                                        "Modifying major compaction scanner to retain deleted cells for a table with disabled index: "
+                                                + fullTableName);
                                 options.setKeepDeletedCells(KeepDeletedCells.TRUE);
                                 options.readAllVersions();
                                 options.setTTL(Long.MAX_VALUE);
@@ -1444,8 +958,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                             // non-Phoenix HBase tables won't be found, do nothing
                         } else {
                             LOGGER.error("Unable to modify compaction scanner to retain deleted cells for a table with disabled Index; "
-                                    + fullTableName,
-                                e);
+                                            + fullTableName, e);
                         }
                     }
                     return null;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
new file mode 100644
index 0000000..844273a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
@@ -0,0 +1,670 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.LOCAL_INDEX_BUILD;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.REPLAY_WRITES;
+import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.UPGRADE_DESC_ROW_KEY;
+import static org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.checkForLocalIndexColumnFamilies;
+import static org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.deserializeExpressions;
+import static org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.deserializeTable;
+import static org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.getBlockingMemstoreSize;
+import static org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.setIndexAndTransactionProperties;
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS;
+import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Ints;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.exception.DataExceedsCapacityException;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
+import org.apache.phoenix.index.PhoenixIndexCodec;
+import org.apache.phoenix.memory.InsufficientMemoryException;
+import org.apache.phoenix.memory.MemoryManager;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PRow;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.ValueSchema;
+import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PBinary;
+import org.apache.phoenix.schema.types.PChar;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PDouble;
+import org.apache.phoenix.schema.types.PFloat;
+import org.apache.phoenix.transaction.PhoenixTransactionContext;
+import org.apache.phoenix.transaction.PhoenixTransactionProvider;
+import org.apache.phoenix.transaction.TransactionFactory;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ExpressionUtil;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.LogUtil;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UngroupedAggregateRegionScanner extends BaseRegionScanner {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(UngroupedAggregateRegionScanner.class);
+
+    private long pageSizeInMs = Long.MAX_VALUE;
+    private  int maxBatchSize = 0;
+    private  Scan scan;
+    private  RegionScanner innerScanner;
+    private  Region region;
+    private final UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver;
+    private final  RegionCoprocessorEnvironment env;
+    private final boolean useQualifierAsIndex;
+    private boolean needToWrite = false;
+    private final Pair<Integer, Integer> minMaxQualifiers;
+    private byte[][] values = null;
+    private PTable.QualifierEncodingScheme encodingScheme;
+    private PTable writeToTable = null;
+    private PTable projectedTable = null;
+    private boolean isDescRowKeyOrderUpgrade;
+    private final int offset;
+    private boolean buildLocalIndex;
+    private List<IndexMaintainer> indexMaintainers;
+    private boolean isPKChanging = false;
+    private long ts;
+    private PhoenixTransactionProvider txnProvider = null;
+    private UngroupedAggregateRegionObserver.MutationList indexMutations;
+    private boolean isDelete = false;
+    private byte[] replayMutations;
+    private boolean isUpsert = false;
+    private List<Expression> selectExpressions = null;
+    private byte[] deleteCQ = null;
+    private byte[] deleteCF = null;
+    private byte[] emptyCF = null;
+    private byte[] indexUUID;
+    private byte[] txState;
+    private byte[] clientVersionBytes;
+    private long blockingMemStoreSize;
+    private long maxBatchSizeBytes = 0L;
+    private Table targetHTable = null;
+    private boolean incrScanRefCount = false;
+    private byte[] indexMaintainersPtr;
+    private boolean useIndexProto;
+    private Connection targetHConn = null;
+
+    public UngroupedAggregateRegionScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
+                                           final RegionScanner innerScanner, final Region region, final Scan scan,
+                                           final RegionCoprocessorEnvironment env,
+                                           final UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver)
+            throws IOException, SQLException{
+        super(innerScanner);
+        this.env = env;
+        this.region = region;
+        this.scan = scan;
+        this.ungroupedAggregateRegionObserver = ungroupedAggregateRegionObserver;
+        this.innerScanner = innerScanner;
+        Configuration conf = env.getConfiguration();
+        if (scan.getAttribute(BaseScannerRegionObserver.SERVER_PAGING) != null) {
+            byte[] pageSizeFromScan =
+                    scan.getAttribute(BaseScannerRegionObserver.AGGREGATE_PAGE_SIZE_IN_MS);
+            if (pageSizeFromScan != null) {
+                pageSizeInMs = Bytes.toLong(pageSizeFromScan);
+            } else {
+                pageSizeInMs =
+                        conf.getLong(UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS,
+                                QueryServicesOptions.DEFAULT_UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS);
+            }
+        }
+        ts = scan.getTimeRange().getMax();
+        boolean localIndexScan = ScanUtil.isLocalIndex(scan);
+        encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
+        int offsetToBe = 0;
+        if (localIndexScan) {
+            /*
+             * For local indexes, we need to set an offset on row key expressions to skip
+             * the region start key.
+             */
+            offsetToBe = region.getRegionInfo().getStartKey().length != 0 ? region.getRegionInfo().getStartKey().length :
+                    region.getRegionInfo().getEndKey().length;
+            ScanUtil.setRowKeyOffset(scan, offsetToBe);
+        }
+        offset = offsetToBe;
+
+        byte[] descRowKeyTableBytes = scan.getAttribute(UPGRADE_DESC_ROW_KEY);
+        isDescRowKeyOrderUpgrade = descRowKeyTableBytes != null;
+        if (isDescRowKeyOrderUpgrade) {
+            LOGGER.debug("Upgrading row key for " + region.getRegionInfo().getTable().getNameAsString());
+            projectedTable = deserializeTable(descRowKeyTableBytes);
+            try {
+                writeToTable = PTableImpl.builderWithColumns(projectedTable,
+                        getColumnsToClone(projectedTable))
+                        .setRowKeyOrderOptimizable(true)
+                        .build();
+            } catch (SQLException e) {
+                ServerUtil.throwIOException("Upgrade failed", e); // Impossible
+            }
+            values = new byte[projectedTable.getPKColumns().size()][];
+        }
+        boolean useProto = false;
+        byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD_PROTO);
+        useProto = localIndexBytes != null;
+        if (localIndexBytes == null) {
+            localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
+        }
+        indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes, useProto);
+        indexMutations = localIndexBytes == null ? new UngroupedAggregateRegionObserver.MutationList() : new UngroupedAggregateRegionObserver.MutationList(1024);
+
+        replayMutations = scan.getAttribute(REPLAY_WRITES);
+        indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID);
+        txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
+        clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
+        if (txState != null) {
+            int clientVersion = clientVersionBytes == null ? ScanUtil.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes);
+            txnProvider = TransactionFactory.getTransactionProvider(txState, clientVersion);
+        }
+        byte[] upsertSelectTable = scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE);
+        if (upsertSelectTable != null) {
+            isUpsert = true;
+            projectedTable = deserializeTable(upsertSelectTable);
+            targetHConn = ConnectionFactory.createConnection(ungroupedAggregateRegionObserver.getUpsertSelectConfig());
+            targetHTable = targetHConn.getTable(
+                    TableName.valueOf(projectedTable.getPhysicalName().getBytes()));
+            selectExpressions = deserializeExpressions(scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS));
+            values = new byte[projectedTable.getPKColumns().size()][];
+            isPKChanging = ExpressionUtil.isPkPositionChanging(new TableRef(projectedTable), selectExpressions);
+        } else {
+            byte[] isDeleteAgg = scan.getAttribute(BaseScannerRegionObserver.DELETE_AGG);
+            isDelete = isDeleteAgg != null && Bytes.compareTo(PDataType.TRUE_BYTES, isDeleteAgg) == 0;
+            if (!isDelete) {
+                deleteCF = scan.getAttribute(BaseScannerRegionObserver.DELETE_CF);
+                deleteCQ = scan.getAttribute(BaseScannerRegionObserver.DELETE_CQ);
+            }
+            emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_CF);
+        }
+        ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan);
+        useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan));
+
+        /**
+         * Slow down the writes if the memstore size more than
+         * (hbase.hregion.memstore.block.multiplier - 1) times hbase.hregion.memstore.flush.size
+         * bytes. This avoids flush storm to hdfs for cases like index building where reads and
+         * write happen to all the table regions in the server.
+         */
+        blockingMemStoreSize = getBlockingMemstoreSize(region, conf) ;
+
+        buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan;
+        if(buildLocalIndex) {
+            checkForLocalIndexColumnFamilies(region, indexMaintainers);
+        }
+        if (isDescRowKeyOrderUpgrade || isDelete || isUpsert
+                || (deleteCQ != null && deleteCF != null) || emptyCF != null || buildLocalIndex) {
+            needToWrite = true;
+            if((isUpsert && (targetHTable == null ||
+                    !targetHTable.getName().equals(region.getTableDescriptor().getTableName())))) {
+                needToWrite = false;
+            }
+            maxBatchSize = conf.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+            maxBatchSizeBytes = conf.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
+                    QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
+        }
+        minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug(LogUtil.addCustomAnnotations("Starting ungrouped coprocessor scan " + scan + " " + region.getRegionInfo(), ScanUtil.getCustomAnnotations(scan)));
+        }
+        useIndexProto = true;
+        indexMaintainersPtr = scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
+        // for backward compatiblity fall back to look by the old attribute
+        if (indexMaintainersPtr == null) {
+            indexMaintainersPtr = scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+            useIndexProto = false;
+        }
+
+        if (needToWrite) {
+            ungroupedAggregateRegionObserver.incrementScansReferenceCount();
+            incrScanRefCount = true;
+        }
+    }
+
+    @Override
+    public RegionInfo getRegionInfo() {
+        return region.getRegionInfo();
+    }
+
+    @Override
+    public boolean isFilterDone() {
+        return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (needToWrite && incrScanRefCount) {
+            ungroupedAggregateRegionObserver.decrementScansReferenceCount();
+        }
+        try {
+            if (targetHTable != null) {
+                try {
+                    targetHTable.close();
+                } catch (IOException e) {
+                    LOGGER.error("Closing table: " + targetHTable + " failed: ", e);
+                }
+            }
+            if (targetHConn != null) {
+                try {
+                    targetHConn.close();
+                } catch (IOException e) {
+                    LOGGER.error("Closing connection: " + targetHConn + " failed: ", e);
+                }
+            }
+        } finally {
+            innerScanner.close();
+        }
+    }
+
+    boolean descRowKeyOrderUpgrade(List<Cell> results, ImmutableBytesWritable ptr,
+                                UngroupedAggregateRegionObserver.MutationList mutations) throws IOException {
+        Arrays.fill(values, null);
+        Cell firstKV = results.get(0);
+        RowKeySchema schema = projectedTable.getRowKeySchema();
+        int maxOffset = schema.iterator(firstKV.getRowArray(), firstKV.getRowOffset() + offset, firstKV.getRowLength(), ptr);
+        for (int i = 0; i < schema.getFieldCount(); i++) {
+            Boolean hasValue = schema.next(ptr, i, maxOffset);
+            if (hasValue == null) {
+                break;
+            }
+            ValueSchema.Field field = schema.getField(i);
+            if (field.getSortOrder() == SortOrder.DESC) {
+                // Special case for re-writing DESC ARRAY, as the actual byte value needs to change in this case
+                if (field.getDataType().isArrayType()) {
+                    field.getDataType().coerceBytes(ptr, null, field.getDataType(),
+                            field.getMaxLength(), field.getScale(), field.getSortOrder(),
+                            field.getMaxLength(), field.getScale(), field.getSortOrder(), true); // force to use correct separator byte
+                }
+                // Special case for re-writing DESC CHAR or DESC BINARY, to force the re-writing of trailing space characters
+                else if (field.getDataType() == PChar.INSTANCE || field.getDataType() == PBinary.INSTANCE) {
+                    int len = ptr.getLength();
+                    while (len > 0 && ptr.get()[ptr.getOffset() + len - 1] == StringUtil.SPACE_UTF8) {
+                        len--;
+                    }
+                    ptr.set(ptr.get(), ptr.getOffset(), len);
+                    // Special case for re-writing DESC FLOAT and DOUBLE, as they're not inverted like they should be (PHOENIX-2171)
+                } else if (field.getDataType() == PFloat.INSTANCE || field.getDataType() == PDouble.INSTANCE) {
+                    byte[] invertedBytes = SortOrder.invert(ptr.get(), ptr.getOffset(), ptr.getLength());
+                    ptr.set(invertedBytes);
+                }
+            } else if (field.getDataType() == PBinary.INSTANCE) {
+                // Remove trailing space characters so that the setValues call below will replace them
+                // with the correct zero byte character. Note this is somewhat dangerous as these
+                // could be legit, but I don't know what the alternative is.
+                int len = ptr.getLength();
+                while (len > 0 && ptr.get()[ptr.getOffset() + len - 1] == StringUtil.SPACE_UTF8) {
+                    len--;
+                }
+                ptr.set(ptr.get(), ptr.getOffset(), len);
+            }
+            values[i] = ptr.copyBytes();
+        }
+        writeToTable.newKey(ptr, values);
+        if (Bytes.compareTo(
+                firstKV.getRowArray(), firstKV.getRowOffset() + offset, firstKV.getRowLength(),
+                ptr.get(),ptr.getOffset() + offset,ptr.getLength()) == 0) {
+            return false;
+        }
+        byte[] newRow = ByteUtil.copyKeyBytesIfNecessary(ptr);
+        if (offset > 0) { // for local indexes (prepend region start key)
+            byte[] newRowWithOffset = new byte[offset + newRow.length];
+            System.arraycopy(firstKV.getRowArray(), firstKV.getRowOffset(), newRowWithOffset, 0, offset);
+            System.arraycopy(newRow, 0, newRowWithOffset, offset, newRow.length);
+            newRow = newRowWithOffset;
+        }
+        byte[] oldRow = Bytes.copy(firstKV.getRowArray(), firstKV.getRowOffset(), firstKV.getRowLength());
+        for (Cell cell : results) {
+            // Copy existing cell but with new row key
+            Cell newCell =
+                    CellBuilderFactory.create(CellBuilderType.DEEP_COPY).
+                            setRow(newRow).
+                            setFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()).
+                            setQualifier(cell.getQualifierArray(),
+                                    cell.getQualifierOffset(), cell.getQualifierLength()).
+                            setTimestamp(cell.getTimestamp()).
+                            setType(cell.getType()).setValue(cell.getValueArray(),
+                            cell.getValueOffset(), cell.getValueLength()).build();
+            switch (cell.getType()) {
+                case Put:
+                    // If Put, point delete old Put
+                    Delete del = new Delete(oldRow);
+                    Cell newDelCell =
+                            CellBuilderFactory.create(CellBuilderType.DEEP_COPY).
+                                    setRow(newRow).
+                                    setFamily(cell.getFamilyArray(), cell.getFamilyOffset(),
+                                            cell.getFamilyLength()).
+                                    setQualifier(cell.getQualifierArray(),
+                                            cell.getQualifierOffset(), cell.getQualifierLength()).
+                                    setTimestamp(cell.getTimestamp()).
+                                    setType(Cell.Type.Delete).
+                                    setValue(ByteUtil.EMPTY_BYTE_ARRAY,
+                                            0, 0).build();
+                    del.add(newDelCell);
+                    mutations.add(del);
+
+                    Put put = new Put(newRow);
+                    put.add(newCell);
+                    mutations.add(put);
+                    break;
+                case Delete:
+                case DeleteColumn:
+                case DeleteFamily:
+                case DeleteFamilyVersion:
+                    Delete delete = new Delete(newRow);
+                    delete.add(newCell);
+                    mutations.add(delete);
+                    break;
+            }
+        }
+        return true;
+    }
+
+    void buildLocalIndex(Tuple result, List<Cell> results, ImmutableBytesWritable ptr) throws IOException {
+        for (IndexMaintainer maintainer : indexMaintainers) {
+            if (!results.isEmpty()) {
+                result.getKey(ptr);
+                ValueGetter valueGetter =
+                        maintainer.createGetterFromKeyValues(
+                                ImmutableBytesPtr.copyBytesIfNecessary(ptr),
+                                results);
+                Put put = maintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+                        valueGetter, ptr, results.get(0).getTimestamp(),
+                        env.getRegion().getRegionInfo().getStartKey(),
+                        env.getRegion().getRegionInfo().getEndKey());
+
+                if (txnProvider != null) {
+                    put = txnProvider.markPutAsCommitted(put, ts, ts);
+                }
+                indexMutations.add(put);
+            }
+        }
+        result.setKeyValues(results);
+    }
+    void deleteRow(List<Cell> results, UngroupedAggregateRegionObserver.MutationList mutations) {
+        // FIXME: the version of the Delete constructor without the lock
+        // args was introduced in 0.94.4, thus if we try to use it here
+        // we can no longer use the 0.94.2 version of the client.
+        Cell firstKV = results.get(0);
+        Delete delete = new Delete(firstKV.getRowArray(),
+                firstKV.getRowOffset(), firstKV.getRowLength(),ts);
+        if (replayMutations != null) {
+            delete.setAttribute(REPLAY_WRITES, replayMutations);
+        }
+        mutations.add(delete);
+        // force tephra to ignore this deletes
+        delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+    }
+
+    void deleteCForQ(Tuple result, List<Cell> results, UngroupedAggregateRegionObserver.MutationList mutations) {
+        // No need to search for delete column, since we project only it
+        // if no empty key value is being set
+        if (emptyCF == null ||
+                result.getValue(deleteCF, deleteCQ) != null) {
+            Delete delete = new Delete(results.get(0).getRowArray(),
+                    results.get(0).getRowOffset(),
+                    results.get(0).getRowLength());
+            delete.addColumns(deleteCF,  deleteCQ, ts);
+            // force tephra to ignore this deletes
+            delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
+            mutations.add(delete);
+        }
+    }
+    void upsert(Tuple result, ImmutableBytesWritable ptr, UngroupedAggregateRegionObserver.MutationList mutations) {
+        Arrays.fill(values, null);
+        int bucketNumOffset = 0;
+        if (projectedTable.getBucketNum() != null) {
+            values[0] = new byte[] { 0 };
+            bucketNumOffset = 1;
+        }
+        int i = bucketNumOffset;
+        List<PColumn> projectedColumns = projectedTable.getColumns();
+        for (; i < projectedTable.getPKColumns().size(); i++) {
+            Expression expression = selectExpressions.get(i - bucketNumOffset);
+            if (expression.evaluate(result, ptr)) {
+                values[i] = ptr.copyBytes();
+                // If SortOrder from expression in SELECT doesn't match the
+                // column being projected into then invert the bits.
+                if (expression.getSortOrder() !=
+                        projectedColumns.get(i).getSortOrder()) {
+                    SortOrder.invert(values[i], 0, values[i], 0,
+                            values[i].length);
+                }
+            }else{
+                values[i] = ByteUtil.EMPTY_BYTE_ARRAY;
+            }
+        }
+        projectedTable.newKey(ptr, values);
+        PRow row = projectedTable.newRow(GenericKeyValueBuilder.INSTANCE, ts, ptr, false);
+        for (; i < projectedColumns.size(); i++) {
+            Expression expression = selectExpressions.get(i - bucketNumOffset);
+            if (expression.evaluate(result, ptr)) {
+                PColumn column = projectedColumns.get(i);
+                if (!column.getDataType().isSizeCompatible(ptr, null,
+                        expression.getDataType(), expression.getSortOrder(),
+                        expression.getMaxLength(), expression.getScale(),
+                        column.getMaxLength(), column.getScale())) {
+                    throw new DataExceedsCapacityException(
+                            column.getDataType(),
+                            column.getMaxLength(),
+                            column.getScale(),
+                            column.getName().getString());
+                }
+                column.getDataType().coerceBytes(ptr, null,
+                        expression.getDataType(), expression.getMaxLength(),
+                        expression.getScale(), expression.getSortOrder(),
+                        column.getMaxLength(), column.getScale(),
+                        column.getSortOrder(), projectedTable.rowKeyOrderOptimizable());
+                byte[] bytes = ByteUtil.copyKeyBytesIfNecessary(ptr);
+                row.setValue(column, bytes);
+            }
+        }
+        for (Mutation mutation : row.toRowMutations()) {
+            if (replayMutations != null) {
+                mutation.setAttribute(REPLAY_WRITES, replayMutations);
+            } else if (txnProvider != null && projectedTable.getType() == PTableType.INDEX) {
+                mutation = txnProvider.markPutAsCommitted((Put)mutation, ts, ts);
+            }
+            mutations.add(mutation);
+        }
+        for (i = 0; i < selectExpressions.size(); i++) {
+            selectExpressions.get(i).reset();
+        }
+    }
+
+    void insertEmptyKeyValue(List<Cell> results, UngroupedAggregateRegionObserver.MutationList mutations) {
+        Set<Long> timeStamps =
+                Sets.newHashSetWithExpectedSize(results.size());
+        for (Cell kv : results) {
+            long kvts = kv.getTimestamp();
+            if (!timeStamps.contains(kvts)) {
+                Put put = new Put(kv.getRowArray(), kv.getRowOffset(),
+                        kv.getRowLength());
+                put.addColumn(emptyCF, QueryConstants.EMPTY_COLUMN_BYTES, kvts,
+                        ByteUtil.EMPTY_BYTE_ARRAY);
+                mutations.add(put);
+            }
+        }
+    }
+    @Override
+    public boolean next(List<Cell> resultsToReturn) throws IOException {
+        boolean hasMore;
+        long startTime = EnvironmentEdgeManager.currentTimeMillis();
+        Configuration conf = env.getConfiguration();
+        final TenantCache tenantCache = GlobalCache.getTenantCache(env, ScanUtil.getTenantId(scan));
+        try (MemoryManager.MemoryChunk em = tenantCache.getMemoryManager().allocate(0)) {
+            Aggregators aggregators = ServerAggregators.deserialize(
+                    scan.getAttribute(BaseScannerRegionObserver.AGGREGATORS), conf, em);
+            Aggregator[] rowAggregators = aggregators.getAggregators();
+            aggregators.reset(rowAggregators);
+            Cell lastCell = null;
+            boolean hasAny = false;
+            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+            Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
+            UngroupedAggregateRegionObserver.MutationList mutations = new UngroupedAggregateRegionObserver.MutationList();
+            if (isDescRowKeyOrderUpgrade || isDelete || isUpsert
+                    || (deleteCQ != null && deleteCF != null) || emptyCF != null || buildLocalIndex) {
+                mutations = new UngroupedAggregateRegionObserver.MutationList(Ints.saturatedCast(maxBatchSize + maxBatchSize / 10));
+            }
+            region.startRegionOperation();
+            try {
+                synchronized (innerScanner) {
+                    do {
+                        ungroupedAggregateRegionObserver.checkForRegionClosingOrSplitting();
+                        List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();
+                        // Results are potentially returned even when the return value of s.next is false
+                        // since this is an indication of whether or not there are more values after the
+                        // ones returned
+                        hasMore = innerScanner.nextRaw(results);
+                        if (!results.isEmpty()) {
+                            lastCell = results.get(0);
+                            result.setKeyValues(results);
+                            if (isDescRowKeyOrderUpgrade) {
+                                if (!descRowKeyOrderUpgrade(results, ptr, mutations)) {
+                                    continue;
+                                }
+                            } else if (buildLocalIndex) {
+                                buildLocalIndex(result, results, ptr);
+                            } else if (isDelete) {
+                                deleteRow(results, mutations);
+                            } else if (isUpsert) {
+                                upsert(result, ptr, mutations);
+                            } else if (deleteCF != null && deleteCQ != null) {
+                                deleteCForQ(result, results, mutations);
+                            }
+                            if (emptyCF != null) {
+                                /*
+                                 * If we've specified an emptyCF, then we need to insert an empty
+                                 * key value "retroactively" for any key value that is visible at
+                                 * the timestamp that the DDL was issued. Key values that are not
+                                 * visible at this timestamp will not ever be projected up to
+                                 * scans past this timestamp, so don't need to be considered.
+                                 * We insert one empty key value per row per timestamp.
+                                 */
+                                insertEmptyKeyValue(results, mutations);
+                            }
+                            if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
+                                ungroupedAggregateRegionObserver.commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr,
+                                        txState, targetHTable, useIndexProto, isPKChanging, clientVersionBytes);
+                                mutations.clear();
+                            }
+                            // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
+
+                            if (ServerUtil.readyToCommit(indexMutations.size(), indexMutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) {
+                                setIndexAndTransactionProperties(indexMutations, indexUUID, indexMaintainersPtr, txState, clientVersionBytes, useIndexProto);
+                                ungroupedAggregateRegionObserver.commitBatch(region, indexMutations, blockingMemStoreSize);
+                                indexMutations.clear();
+                            }
+                            aggregators.aggregate(rowAggregators, result);
+                            hasAny = true;
+                        }
+                    } while (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) < pageSizeInMs);
+
+                    if (!mutations.isEmpty()) {
+                        ungroupedAggregateRegionObserver.commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState,
+                                targetHTable, useIndexProto, isPKChanging, clientVersionBytes);
+                        mutations.clear();
+                    }
+                    if (!indexMutations.isEmpty()) {
+                        ungroupedAggregateRegionObserver.commitBatch(region, indexMutations, blockingMemStoreSize);
+                        indexMutations.clear();
+                    }
+                }
+            } catch (InsufficientMemoryException e) {
+                throw new DoNotRetryIOException(e);
+            } catch (DataExceedsCapacityException e) {
+                throw new DoNotRetryIOException(e.getMessage(), e);
+            } catch (Throwable e) {
+                LOGGER.error("Exception in UngroupedAggreagteRegionScanner for region "
+                        + region.getRegionInfo().getRegionNameAsString(), e);
+                throw e;
+            }
+            Cell keyValue;
+            if (hasAny) {
+                byte[] value = aggregators.toBytes(rowAggregators);
+                keyValue = PhoenixKeyValueUtil.newKeyValue(CellUtil.cloneRow(lastCell), SINGLE_COLUMN_FAMILY, SINGLE_COLUMN,
+                        AGG_TIMESTAMP, value, 0, value.length);
+                resultsToReturn.add(keyValue);
+            }
+            return hasMore;
+        } finally {
+            region.closeRegionOperation();
+        }
+    }
+
+    @Override
+    public long getMaxResultSize() {
+        return scan.getMaxResultSize();
+    }
+}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 583baa8..3d7447a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -26,6 +26,7 @@ import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NO
 import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED;
 import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.THRESHOLD_NOT_REACHED;
 import static org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.UNINITIALIZED;
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 
 import java.io.IOException;
 import java.sql.SQLException;
@@ -39,10 +40,12 @@ import javax.annotation.concurrent.GuardedBy;
 import org.apache.hadoop.hbase.client.AbstractClientScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException;
 import org.apache.phoenix.execute.BaseQueryPlan;
 import org.apache.phoenix.execute.MutationState;
@@ -134,6 +137,7 @@ public class TableResultIterator implements ResultIterator {
         this.retry=plan.getContext().getConnection().getQueryServices().getProps()
                 .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES);
         IndexUtil.setScanAttributesForIndexReadRepair(scan, table, plan.getContext().getConnection());
+        scan.setAttribute(BaseScannerRegionObserver.SERVER_PAGING, TRUE_BYTES);
     }
 
     @Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java
index 0bf5982..d19c5b2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java
@@ -21,6 +21,7 @@ import static org.apache.phoenix.query.QueryConstants.*;
 
 import java.sql.SQLException;
 
+import org.apache.phoenix.expression.aggregator.Aggregator;
 import org.apache.phoenix.expression.aggregator.Aggregators;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
@@ -33,21 +34,37 @@ public class UngroupedAggregatingResultIterator extends GroupedAggregatingResult
     public UngroupedAggregatingResultIterator( PeekingResultIterator resultIterator, Aggregators aggregators) {
         super(resultIterator, aggregators);
     }
-    
     @Override
     public Tuple next() throws SQLException {
-        Tuple result = super.next();
-        // Ensure ungrouped aggregregation always returns a row, even if the underlying iterator doesn't.
-        if (result == null && !hasRows) {
-            // We should reset ClientAggregators here in case they are being reused in a new ResultIterator.
-            aggregators.reset(aggregators.getAggregators());
-            byte[] value = aggregators.toBytes(aggregators.getAggregators());
-            result = new SingleKeyValueTuple(
-                    PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, 
-                            SINGLE_COLUMN_FAMILY, 
-                            SINGLE_COLUMN, 
-                            AGG_TIMESTAMP, 
-                            value));
+        Tuple result = resultIterator.next();
+        if (result == null) {
+            // Ensure ungrouped aggregregation always returns a row, even if the underlying iterator doesn't.
+            if (!hasRows) {
+                // We should reset ClientAggregators here in case they are being reused in a new ResultIterator.
+                aggregators.reset(aggregators.getAggregators());
+                byte[] value = aggregators.toBytes(aggregators.getAggregators());
+                result = new SingleKeyValueTuple(
+                        PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY,
+                                SINGLE_COLUMN_FAMILY,
+                                SINGLE_COLUMN,
+                                AGG_TIMESTAMP,
+                                value));
+            }
+        } else {
+            Aggregator[] rowAggregators = aggregators.getAggregators();
+            aggregators.reset(rowAggregators);
+            while (true) {
+                aggregators.aggregate(rowAggregators, result);
+                Tuple nextResult = resultIterator.peek();
+                if (nextResult == null) {
+                    break;
+                }
+                result = resultIterator.next();
+            }
+
+            byte[] value = aggregators.toBytes(rowAggregators);
+            Tuple tuple = wrapKeyValueAsResult(PhoenixKeyValueUtil .newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+            result = tuple;
         }
         hasRows = true;
         return result;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index e45c30e..b50af9f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -327,6 +327,8 @@ public interface QueryServices extends SQLCloseable {
     public static final String LONG_VIEW_INDEX_ENABLED_ATTRIB = "phoenix.index.longViewIndex.enabled";
     // The number of index rows to be rebuild in one RPC call
     public static final String INDEX_REBUILD_PAGE_SIZE_IN_ROWS = "phoenix.index.rebuild_page_size_in_rows";
+    // The number of rows to be scanned in one RPC call
+    public static final String UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS = "phoenix.ungrouped.aggregate_page_size_in_ms";
 
 
     // Before 4.15 when we created a view we included the parent table column metadata in the view
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 0a73ca9..2122310 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -337,7 +337,8 @@ public class QueryServicesOptions {
 
     public static final long DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS = 7*24*60*60*1000; /* 7 days */
     public static final boolean DEFAULT_INDEX_REGION_OBSERVER_ENABLED = true;
-    public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 32*1024;
+    public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 32*1000;
+    public static final long DEFAULT_UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS = 1000; // 1 second
 
     public static final boolean DEFAULT_ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK = false;
 
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index b2a346f..ccc72ec 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.query;
 import static org.apache.phoenix.hbase.index.write.ParallelWriterIndexCommitter.NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY;
 import static org.apache.phoenix.query.QueryConstants.MILLIS_IN_DAY;
 import static org.apache.phoenix.query.QueryServices.DROP_METADATA_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB;
 import static org.apache.phoenix.util.PhoenixRuntime.CURRENT_SCN_ATTRIB;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
@@ -140,7 +141,6 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
 import org.apache.phoenix.jdbc.PhoenixTestDriver;
 import org.apache.phoenix.schema.NewerTableAlreadyExistsException;
-import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableAlreadyExistsException;
 import org.apache.phoenix.schema.TableNotFoundException;
@@ -626,6 +626,13 @@ public abstract class BaseTest {
         conf.setInt(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, 10000);
         conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
         conf.setInt(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY, 1);
+        conf.setInt(GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, 0);
+        if (conf.getLong(QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0) == 0) {
+            conf.setLong(QueryServices.UNGROUPED_AGGREGATE_PAGE_SIZE_IN_MS, 0);
+            // This results in processing one row at a time in each next operation of the aggregate region
+            // scanner, i.e.,  one row pages. In other words, 0ms page allows only one row to be processed
+            // within one page; 0ms page is equivalent to one-row page
+        }
         return conf;
     }