You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by an...@apache.org on 2017/01/31 06:18:31 UTC

phoenix git commit: PHOENIX-3271 Distribute UPSERT SELECT across cluster

Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.1 29bed889d -> 496f376d2


PHOENIX-3271 Distribute UPSERT SELECT across cluster


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/496f376d
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/496f376d
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/496f376d

Branch: refs/heads/4.x-HBase-1.1
Commit: 496f376d23e91c01c1d98286b451a4acf765173c
Parents: 29bed88
Author: Ankit Singhal <an...@gmail.com>
Authored: Tue Jan 31 11:48:24 2017 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Tue Jan 31 11:48:24 2017 +0530

----------------------------------------------------------------------
 .../phoenix/monitoring/PhoenixMetricsIT.java    | 56 ++++-------
 .../apache/phoenix/rpc/PhoenixServerRpcIT.java  |  6 ++
 .../apache/phoenix/compile/UpsertCompiler.java  | 38 ++++----
 .../UngroupedAggregateRegionObserver.java       | 97 ++++++++++++++++++--
 .../org/apache/phoenix/schema/PTableImpl.java   | 10 ++
 5 files changed, 138 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/496f376d/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
index 16a66df..4d075ab 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
@@ -439,18 +439,31 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
     public void testMetricsForUpsertSelectWithAutoCommit() throws Exception {
         String tableName1 = generateUniqueName();
         long table1SaltBuckets = 6;
-        String ddl = "CREATE TABLE " + tableName1 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
-                + table1SaltBuckets;
+        String ddl = "CREATE TABLE " + tableName1 + " (K BIGINT NOT NULL PRIMARY KEY ROW_TIMESTAMP, V VARCHAR)"
+                + " SALT_BUCKETS = " + table1SaltBuckets + ", IMMUTABLE_ROWS = true";
         Connection ddlConn = DriverManager.getConnection(getUrl());
         ddlConn.createStatement().execute(ddl);
         ddlConn.close();
         int numRows = 10;
-        insertRowsInTable(tableName1, numRows);
+        String dml = "UPSERT INTO " + tableName1 + " VALUES (?, ?)";
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            PreparedStatement stmt = conn.prepareStatement(dml);
+            for (int i = 1; i <= numRows; i++) {
+                stmt.setLong(1, i);
+                stmt.setString(2, "value" + i);
+                stmt.executeUpdate();
+            }
+            conn.commit();
+        }
 
         String tableName2 = generateUniqueName();
-        ddl = "CREATE TABLE " + tableName2 + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = 10";
+        ddl = "CREATE TABLE " + tableName2 + " (K BIGINT NOT NULL PRIMARY KEY ROW_TIMESTAMP, V VARCHAR)"
+                + " SALT_BUCKETS = 10" + ", IMMUTABLE_ROWS = true";
         ddlConn = DriverManager.getConnection(getUrl());
         ddlConn.createStatement().execute(ddl);
+        String indexName = generateUniqueName();
+        ddl = "CREATE INDEX " + indexName + " ON " + tableName2 + " (V)";
+        ddlConn.createStatement().execute(ddl);
         ddlConn.close();
 
         String upsertSelect = "UPSERT INTO " + tableName2 + " SELECT * FROM " + tableName1;
@@ -602,41 +615,6 @@ public class PhoenixMetricsIT extends BaseUniqueNamesOwnClusterIT {
     }
     
     @Test
-    public void testMetricsForUpsertSelectSameTable() throws Exception {
-        String tableName = generateUniqueName();
-        long table1SaltBuckets = 6;
-        String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + " SALT_BUCKETS = "
-                + table1SaltBuckets;
-        Connection ddlConn = DriverManager.getConnection(getUrl());
-        ddlConn.createStatement().execute(ddl);
-        ddlConn.close();
-        int numRows = 10;
-        insertRowsInTable(tableName, numRows);
-
-        Connection conn = DriverManager.getConnection(getUrl());
-        conn.setAutoCommit(false);
-        String upsertSelect = "UPSERT INTO " + tableName + " SELECT * FROM " + tableName;
-        conn.createStatement().executeUpdate(upsertSelect);
-        conn.commit();
-        PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
-        
-        Map<String, Map<String, Long>> mutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
-        // Because auto-commit is off, upsert select into the same table will run on the client.
-        // So we should have client side read and write metrics available.
-        assertMutationMetrics(tableName, numRows, mutationMetrics);
-        Map<String, Map<String, Long>> readMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
-        assertReadMetricsForMutatingSql(tableName, table1SaltBuckets, readMetrics);
-        PhoenixRuntime.resetMetrics(pConn);
-        // With autocommit on, still, this upsert select runs on the client side.
-        conn.setAutoCommit(true);
-        conn.createStatement().executeUpdate(upsertSelect);
-        Map<String, Map<String, Long>> autoCommitMutationMetrics = PhoenixRuntime.getWriteMetricsForMutationsSinceLastReset(pConn);
-        Map<String, Map<String, Long>> autoCommitReadMetrics = PhoenixRuntime.getReadMetricsForMutationsSinceLastReset(pConn);
-        assertMetricsAreSame(mutationMetrics, autoCommitMutationMetrics, mutationMetricsToSkip);
-        assertMetricsAreSame(readMetrics, autoCommitReadMetrics, readMetricsToSkip);
-    }
-    
-    @Test
     public void testOpenConnectionsCounter() throws Exception {
         long numOpenConnections = GLOBAL_OPEN_PHOENIX_CONNECTIONS.getMetric().getValue();
         try (Connection conn = DriverManager.getConnection(getUrl())) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/496f376d/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
index 92f7294..410f02c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/PhoenixServerRpcIT.java
@@ -153,6 +153,12 @@ public class PhoenixServerRpcIT extends BaseUniqueNamesOwnClusterIT {
             
             // verify that that index queue is used only once (for the first upsert)
             Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor()).dispatch(Mockito.any(CallRunner.class));
+            
+            TestPhoenixIndexRpcSchedulerFactory.reset();
+            conn.createStatement().execute(
+                    "CREATE INDEX " + indexName + "_1 ON " + dataTableFullName + " (v1) INCLUDE (v2)");
+            // verify that that index queue is used and only once (during Upsert Select on server to build the index)
+            Mockito.verify(TestPhoenixIndexRpcSchedulerFactory.getIndexRpcExecutor()).dispatch(Mockito.any(CallRunner.class));
         }
         finally {
             conn.close();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/496f376d/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 32ce6ad..18070d4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -85,6 +85,7 @@ import org.apache.phoenix.schema.MetaDataEntityNotFoundException;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnImpl;
 import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.PNameFactory;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTable.ViewType;
@@ -506,7 +507,7 @@ public class UpsertCompiler {
                         && tableRefToBe.equals(selectResolver.getTables().get(0));
                     tableRefToBe = adjustTimestampToMinOfSameTable(tableRefToBe, selectResolver.getTables());
                     /* We can run the upsert in a coprocessor if:
-                     * 1) from has only 1 table and the into table matches from table
+                     * 1) from has only 1 table
                      * 2) the select query isn't doing aggregation (which requires a client-side final merge)
                      * 3) autoCommit is on
                      * 4) the table is not immutable with indexes, as the client is the one that figures out the additional
@@ -523,9 +524,10 @@ public class UpsertCompiler {
                         parallelIteratorFactoryToBe = new UpsertingParallelIteratorFactory(connection, tableRefToBe, useServerTimestampToBe);
                         // If we're in the else, then it's not an aggregate, distinct, limited, or sequence using query,
                         // so we might be able to run it entirely on the server side.
-                        // For a table with row timestamp column, we can't guarantee that the row key will reside in the
                         // region space managed by region servers. So we bail out on executing on server side.
-                        runOnServer = sameTable && isAutoCommit && !table.isTransactional() && !(table.isImmutableRows() && !table.getIndexes().isEmpty()) && table.getRowTimestampColPos() == -1;
+                        runOnServer = isAutoCommit && !table.isTransactional()
+                                && !(table.isImmutableRows() && !table.getIndexes().isEmpty())
+                                && !select.isJoin() && table.getRowTimestampColPos() == -1;
                     }
                     // If we may be able to run on the server, add a hint that favors using the data table
                     // if all else is equal.
@@ -599,7 +601,6 @@ public class UpsertCompiler {
         if (valueNodes == null) {
             queryPlanToBe = new QueryOptimizer(services).optimize(queryPlanToBe, statement, targetColumns, parallelIteratorFactoryToBe);
             projectorToBe = queryPlanToBe.getProjector();
-            runOnServer &= queryPlanToBe.getTableRef().equals(tableRefToBe);
         }
         final List<PColumn> allColumns = allColumnsToBe;
         final RowProjector projector = projectorToBe;
@@ -657,41 +658,34 @@ public class UpsertCompiler {
                         Expression literalNull = LiteralExpression.newConstant(null, column.getDataType(), Determinism.ALWAYS);
                         projectedExpressions.add(literalNull);
                         allColumnsIndexes[pos] = column.getPosition();
-                    } 
+                    }
                     // Swap select expression at pos with i
                     Collections.swap(projectedExpressions, i, pos);
                     // Swap column indexes and reverse column indexes too
                     int tempPos = allColumnsIndexes[i];
                     allColumnsIndexes[i] = allColumnsIndexes[pos];
                     allColumnsIndexes[pos] = tempPos;
-                    reverseColumnIndexes[tempPos] = reverseColumnIndexes[i];
+                    reverseColumnIndexes[tempPos] = pos;
                     reverseColumnIndexes[i] = i;
                 }
-                // If any pk slots are changing, be conservative and don't run this server side.
-                // If the row ends up living in a different region, we'll get an error otherwise.
-                for (int i = 0; i < table.getPKColumns().size(); i++) {
-                    PColumn column = table.getPKColumns().get(i);
-                    Expression source = projectedExpressions.get(i);
-                    if (source == null || !source.equals(new ColumnRef(tableRef, column.getPosition()).newColumnExpression())) {
-                        // TODO: we could check the region boundaries to see if the pk will still be in it.
-                        runOnServer = false; // bail on running server side, since PK may be changing
-                        break;
-                    }
-                }
-                
+
                 ////////////////////////////////////////////////////////////////////
                 // UPSERT SELECT run server-side
                 /////////////////////////////////////////////////////////////////////
                 if (runOnServer) {
                     // Iterate through columns being projected
                     List<PColumn> projectedColumns = Lists.newArrayListWithExpectedSize(projectedExpressions.size());
-                    for (int i = 0; i < projectedExpressions.size(); i++) {
+                    int posOff = table.getBucketNum() != null ? 1 : 0;
+                    for (int i = 0 ; i < projectedExpressions.size(); i++) {
                         // Must make new column if position has changed
                         PColumn column = allColumns.get(allColumnsIndexes[i]);
-                        projectedColumns.add(column.getPosition() == i ? column : new PColumnImpl(column, i));
+                        projectedColumns.add(column.getPosition() == i + posOff ? column : new PColumnImpl(column, i));
                     }
                     // Build table from projectedColumns
-                    PTable projectedTable = PTableImpl.makePTable(table, projectedColumns);
+                    // Hack to add default column family to be used on server in case no value column is projected.
+                    PTable projectedTable = PTableImpl.makePTable(table, projectedColumns,
+                            PNameFactory.newName(SchemaUtil.getEmptyColumnFamily(table)));  
+                    
                     
                     SelectStatement select = SelectStatement.create(SelectStatement.COUNT_ONE, upsert.getHint());
                     StatementContext statementContext = queryPlan.getContext();
@@ -717,7 +711,7 @@ public class UpsertCompiler {
                     scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS, UngroupedAggregateRegionObserver.serialize(projectedExpressions));
                     
                     // Ignore order by - it has no impact
-                    final QueryPlan aggPlan = new AggregatePlan(context, select, tableRef, aggProjector, null,null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
+                    final QueryPlan aggPlan = new AggregatePlan(context, select, statementContext.getCurrentTable(), aggProjector, null,null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null);
                     return new MutationPlan() {
                         @Override
                         public ParameterMetaData getParameterMetaData() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/496f376d/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index a888bb2..db3c792 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
@@ -88,12 +89,14 @@ import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PRow;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker;
 import org.apache.phoenix.schema.stats.StatisticsCollector;
@@ -132,8 +135,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
     // TODO: move all constants into a single class
     public static final String UNGROUPED_AGG = "UngroupedAgg";
     public static final String DELETE_AGG = "DeleteAgg";
-    public static final String UPSERT_SELECT_TABLE = "UpsertSelectTable";
-    public static final String UPSERT_SELECT_EXPRS = "UpsertSelectExprs";
     public static final String DELETE_CQ = "DeleteCQ";
     public static final String DELETE_CF = "DeleteCF";
     public static final String EMPTY_CF = "EmptyCF";
@@ -210,6 +211,40 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
       logger.debug("Committing bactch of " + mutations.size() + " mutations for " + region.getRegionInfo().getTable().getNameAsString());
       region.batchMutate(mutations.toArray(mutationArray), HConstants.NO_NONCE, HConstants.NO_NONCE);
     }
+    
+    private void commitBatchWithHTable(HTable table, Region region, List<Mutation> mutations, byte[] indexUUID,
+            long blockingMemstoreSize, byte[] indexMaintainersPtr, byte[] txState) throws IOException {
+
+        if (indexUUID != null) {
+            // Need to add indexMaintainers for each mutation as table.batch can be distributed across servers
+            for (Mutation m : mutations) {
+                if (indexMaintainersPtr != null) {
+                    m.setAttribute(PhoenixIndexCodec.INDEX_MD, indexMaintainersPtr);
+                }
+                if (txState != null) {
+                    m.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
+                }
+                m.setAttribute(PhoenixIndexCodec.INDEX_UUID, indexUUID);
+            }
+        }
+        // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the
+        // flush happen which decrease the memstore size and then writes allowed on the region.
+        for (int i = 0; region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) {
+            try {
+                checkForRegionClosing();
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new IOException(e);
+            }
+        }
+        logger.debug("Committing batch of " + mutations.size() + " mutations for " + table);
+        try {
+            table.batch(mutations);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
 
     /**
      * There is a chance that region might be closing while running balancer/move/merge. In this
@@ -308,12 +343,18 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         byte[] deleteCQ = null;
         byte[] deleteCF = null;
         byte[] emptyCF = null;
+        HTable targetHTable = null;
+        boolean areMutationInSameRegion = true;
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
         if (upsertSelectTable != null) {
             isUpsert = true;
             projectedTable = deserializeTable(upsertSelectTable);
+            targetHTable = new HTable(env.getConfiguration(), projectedTable.getPhysicalName().getBytes());
             selectExpressions = deserializeExpressions(scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS));
             values = new byte[projectedTable.getPKColumns().size()][];
+            areMutationInSameRegion = Bytes.compareTo(targetHTable.getTableName(),
+                    region.getTableDesc().getTableName().getName()) == 0
+                    && !isPkPositionChanging(new TableRef(projectedTable), selectExpressions);
             
         } else {
             byte[] isDeleteAgg = scan.getAttribute(BaseScannerRegionObserver.DELETE_AGG);
@@ -522,10 +563,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                             delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
                         } else if (isUpsert) {
                             Arrays.fill(values, null);
-                            int i = 0;
+                            int bucketNumOffset = 0;
+                            if (projectedTable.getBucketNum() != null) {
+                                values[0] = new byte[] { 0 };
+                                bucketNumOffset = 1;
+                            }
+                            int i = bucketNumOffset;
                             List<PColumn> projectedColumns = projectedTable.getColumns();
                             for (; i < projectedTable.getPKColumns().size(); i++) {
-                                Expression expression = selectExpressions.get(i);
+                                Expression expression = selectExpressions.get(i - bucketNumOffset);
                                 if (expression.evaluate(result, ptr)) {
                                     values[i] = ptr.copyBytes();
                                     // If SortOrder from expression in SELECT doesn't match the
@@ -535,12 +581,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                                         SortOrder.invert(values[i], 0, values[i], 0,
                                             values[i].length);
                                     }
+                                }else{
+                                    values[i] = ByteUtil.EMPTY_BYTE_ARRAY;
                                 }
                             }
                             projectedTable.newKey(ptr, values);
                             PRow row = projectedTable.newRow(kvBuilder, ts, ptr, false);
                             for (; i < projectedColumns.size(); i++) {
-                                Expression expression = selectExpressions.get(i);
+                                Expression expression = selectExpressions.get(i - bucketNumOffset);
                                 if (expression.evaluate(result, ptr)) {
                                     PColumn column = projectedColumns.get(i);
                                     if (!column.getDataType().isSizeCompatible(ptr, null,
@@ -605,8 +653,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                             List<List<Mutation>> batchMutationList =
                                 MutationState.getMutationBatchList(batchSize, batchSizeBytes, mutations);
                             for (List<Mutation> batchMutations : batchMutationList) {
-                                commitBatch(region, batchMutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr,
-                                    txState);
+                                commit(region, batchMutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr,
+                                        txState, areMutationInSameRegion, targetHTable);
                                 batchMutations.clear();
                             }
                             mutations.clear();
@@ -624,7 +672,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                     }
                 } while (hasMore);
                 if (!mutations.isEmpty()) {
-                    commitBatch(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState);
+                    commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr, txState,
+                            areMutationInSameRegion, targetHTable);
+                    mutations.clear();
                 }
 
                 if (!indexMutations.isEmpty()) {
@@ -638,6 +688,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                     scansReferenceCount--;
                 }
             }
+            if (targetHTable != null) {
+                targetHTable.close();
+            }
             try {
                 innerScanner.close();
             } finally {
@@ -678,8 +731,36 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             }
         };
         return scanner;
+
+    }
+
+    private void commit(Region region, List<Mutation> mutations, byte[] indexUUID, long blockingMemstoreSize,
+            byte[] indexMaintainersPtr, byte[] txState, boolean areMutationsForSameRegion, HTable hTable)
+            throws IOException {
+        if (!areMutationsForSameRegion) {
+            assert hTable != null;// table cannot be null
+            commitBatchWithHTable(hTable, region, mutations, indexUUID, blockingMemstoreSize, indexMaintainersPtr,
+                    txState);
+        } else {
+            commitBatch(region, mutations, indexUUID, blockingMemstoreSize, indexMaintainersPtr, txState);
+        }
+    }
+
+    private boolean isPkPositionChanging(TableRef tableRef, List<Expression> projectedExpressions) throws SQLException {
+        // If the row ends up living in a different region, we'll get an error otherwise.
+        for (int i = 0; i < tableRef.getTable().getPKColumns().size(); i++) {
+            PColumn column = tableRef.getTable().getPKColumns().get(i);
+            Expression source = projectedExpressions.get(i);
+            if (source == null || !source
+                    .equals(new ColumnRef(tableRef, column.getPosition()).newColumnExpression())) { return true; }
+        }
+        return false;
     }
 
+    private boolean readyToCommit(List<Mutation> mutations,int batchSize){
+        return !mutations.isEmpty() && batchSize > 0 &&
+        mutations.size() > batchSize;
+    }
     @Override
     public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
             final InternalScanner scanner, final ScanType scanType) throws IOException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/496f376d/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 98a0b99..b4e0a06 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -250,6 +250,16 @@ public class PTableImpl implements PTable {
                 table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(),
                 table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
     }
+    
+    public static PTableImpl makePTable(PTable table, Collection<PColumn> columns, PName defaultFamily) throws SQLException {
+        return new PTableImpl(
+                table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), table.getTimeStamp(),
+                table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(),
+                table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), defaultFamily, table.getViewStatement(),
+                table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(),
+                table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(),
+                table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
+    }
 
     public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, Collection<PColumn> columns) throws SQLException {
         return new PTableImpl(