You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by el...@apache.org on 2015/02/27 20:16:23 UTC

phoenix git commit: Revert "Surface partial saves in CommitExcepiton (PHOENIX-900) from https://github.com/apache/phoenix/pull/37"

Repository: phoenix
Updated Branches:
  refs/heads/master fa58c7821 -> 569469a46


Revert "Surface partial saves in CommitExcepiton (PHOENIX-900) from https://github.com/apache/phoenix/pull/37"

This reverts commit fa58c7821a2e8fce30a8c0ff6e42aa00134dbce0.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/569469a4
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/569469a4
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/569469a4

Branch: refs/heads/master
Commit: 569469a46bae57cc4d6cbbcd7e01d535560f07e2
Parents: fa58c78
Author: Eli Levine <el...@apache.org>
Authored: Fri Feb 27 11:15:28 2015 -0800
Committer: Eli Levine <el...@apache.org>
Committed: Fri Feb 27 11:15:28 2015 -0800

----------------------------------------------------------------------
 .../apache/phoenix/execute/PartialCommitIT.java | 302 -------------------
 .../apache/phoenix/compile/DeleteCompiler.java  |  13 +-
 .../apache/phoenix/compile/UpsertCompiler.java  |  13 +-
 .../apache/phoenix/execute/CommitException.java |  35 +--
 .../apache/phoenix/execute/MutationState.java   | 156 ++++------
 .../apache/phoenix/jdbc/PhoenixConnection.java  |  37 +--
 .../phoenix/jdbc/PhoenixPreparedStatement.java  |   7 +-
 .../apache/phoenix/jdbc/PhoenixStatement.java   |   3 -
 .../phoenix/execute/MutationStateTest.java      |  64 ----
 .../java/org/apache/phoenix/query/BaseTest.java |   2 +-
 10 files changed, 89 insertions(+), 543 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
deleted file mode 100644
index 550d7de..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Copyright 2014 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.execute;
-
-import static com.google.common.collect.Lists.newArrayList;
-import static com.google.common.collect.Sets.newHashSet;
-import static java.util.Collections.singletonList;
-import static org.apache.phoenix.query.BaseTest.initAndRegisterDriver;
-import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
-import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
-import static org.apache.phoenix.util.TestUtil.LOCALHOST;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.Driver;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver;
-import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
-import org.apache.phoenix.hbase.index.Indexer;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
-@Category(NeedsOwnMiniClusterTest.class)
-public class PartialCommitIT {
-    
-    private static final String TABLE_NAME_TO_FAIL = "b_failure_table".toUpperCase();
-    private static final byte[] ROW_TO_FAIL = Bytes.toBytes("fail me");
-    private static final String UPSERT_TO_FAIL = "upsert into " + TABLE_NAME_TO_FAIL + " values ('" + Bytes.toString(ROW_TO_FAIL) + "', 'boom!')";
-    private static final String UPSERT_SELECT_TO_FAIL = "upsert into " + TABLE_NAME_TO_FAIL + " select k, c from a_success_table";
-    private static final String DELETE_TO_FAIL = "delete from " + TABLE_NAME_TO_FAIL + "  where k='z'";
-    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-    private static String url;
-    private static Driver driver;
-    private static final Properties props = new Properties();
-    
-    static {
-        props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 10);
-    }
-    
-    @BeforeClass
-    public static void setupCluster() throws Exception {
-      Configuration conf = TEST_UTIL.getConfiguration();
-      setUpConfigForMiniCluster(conf);
-      conf.setClass("hbase.coprocessor.region.classes", FailingRegionObserver.class, RegionObserver.class);
-      conf.setBoolean("hbase.coprocessor.abortonerror", false);
-      conf.setBoolean(Indexer.CHECK_VERSION_CONF_KEY, false);
-      TEST_UTIL.startMiniCluster();
-      String clientPort = TEST_UTIL.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
-      url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR + clientPort
-              + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
-
-      Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
-      // Must update config before starting server
-      props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
-      driver = initAndRegisterDriver(url, new ReadOnlyProps(props.entrySet().iterator()));
-      createTablesWithABitOfData();
-    }
-    
-    private static void createTablesWithABitOfData() throws Exception {
-        Properties props = new Properties();
-        props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 10);
-
-        try (Connection con = driver.connect(url, new Properties())) {
-            Statement sta = con.createStatement();
-            sta.execute("create table a_success_table (k varchar primary key, c varchar)");
-            sta.execute("create table b_failure_table (k varchar primary key, c varchar)");
-            sta.execute("create table c_success_table (k varchar primary key, c varchar)");
-            con.commit();
-        }
-
-        props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 100);
-
-        try (Connection con = driver.connect(url, new Properties())) {
-            con.setAutoCommit(false);
-            Statement sta = con.createStatement();
-            for (String table : newHashSet("a_success_table", TABLE_NAME_TO_FAIL, "c_success_table")) {
-                sta.execute("upsert into " + table + " values ('z', 'z')");
-                sta.execute("upsert into " + table + " values ('zz', 'zz')");
-                sta.execute("upsert into " + table + " values ('zzz', 'zzz')");
-            }
-            con.commit();
-        }
-    }
-    
-    @AfterClass
-    public static void teardownCluster() throws Exception {
-      TEST_UTIL.shutdownMiniCluster();
-    }
-    
-    @Test
-    public void testNoFailure() {
-        testPartialCommit(singletonList("upsert into a_success_table values ('testNoFailure', 'a')"), 0, new int[0], false,
-                                        singletonList("select count(*) from a_success_table where k='testNoFailure'"), singletonList(new Integer(1)));
-    }
-    
-    @Test
-    public void testUpsertFailure() {
-        testPartialCommit(newArrayList("upsert into a_success_table values ('testUpsertFailure1', 'a')", 
-                                       UPSERT_TO_FAIL, 
-                                       "upsert into a_success_table values ('testUpsertFailure2', 'b')"), 
-                                       1, new int[]{1}, true,
-                                       newArrayList("select count(*) from a_success_table where k like 'testUpsertFailure_'",
-                                                    "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), 
-                                       newArrayList(new Integer(2), new Integer(0)));
-    }
-    
-    @Test
-    public void testUpsertSelectFailure() throws SQLException {
-        props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 100);
-
-        try (Connection con = driver.connect(url, new Properties())) {
-            con.createStatement().execute("upsert into a_success_table values ('" + Bytes.toString(ROW_TO_FAIL) + "', 'boom!')");
-            con.commit();
-        }
-        
-        testPartialCommit(newArrayList("upsert into a_success_table values ('testUpsertSelectFailure', 'a')", 
-                                       UPSERT_SELECT_TO_FAIL), 
-                                       1, new int[]{1}, true,
-                                       newArrayList("select count(*) from a_success_table where k in ('testUpsertSelectFailure', '" + Bytes.toString(ROW_TO_FAIL) + "')",
-                                                    "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), 
-                                       newArrayList(new Integer(2), new Integer(0)));
-    }
-    
-    @Test
-    public void testDeleteFailure() {
-        testPartialCommit(newArrayList("upsert into a_success_table values ('testDeleteFailure1', 'a')", 
-                                       DELETE_TO_FAIL,
-                                       "upsert into a_success_table values ('testDeleteFailure2', 'b')"), 
-                                       1, new int[]{1}, true,
-                                       newArrayList("select count(*) from a_success_table where k like 'testDeleteFailure_'",
-                                                    "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = 'z'"), 
-                                       newArrayList(new Integer(2), new Integer(1)));
-    }
-    
-    /**
-     * {@link MutationState} keeps mutations ordered lexicographically by table name.
-     */
-    @Test
-    public void testOrderOfMutationsIsPredicatable() {
-        testPartialCommit(newArrayList("upsert into c_success_table values ('testOrderOfMutationsIsPredicatable', 'c')", // will fail because c_success_table is after b_failure_table by table sort order
-                                       UPSERT_TO_FAIL, 
-                                       "upsert into a_success_table values ('testOrderOfMutationsIsPredicatable', 'a')"), // will succeed because a_success_table is before b_failure_table by table sort order
-                                       2, new int[]{0,1}, true,
-                                       newArrayList("select count(*) from c_success_table where k='testOrderOfMutationsIsPredicatable'",
-                                                    "select count(*) from a_success_table where k='testOrderOfMutationsIsPredicatable'",
-                                                    "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), 
-                                       newArrayList(new Integer(0), new Integer(1), new Integer(0)));
-    }
-    
-    @Test
-    public void checkThatAllStatementTypesMaintainOrderInConnection() {
-        testPartialCommit(newArrayList("upsert into a_success_table values ('k', 'checkThatAllStatementTypesMaintainOrderInConnection')", 
-                                       "upsert into a_success_table select k, c from c_success_table",
-                                       DELETE_TO_FAIL,
-                                       "select * from a_success_table", 
-                                       UPSERT_TO_FAIL), 
-                                       2, new int[]{2,4}, true,
-                                       newArrayList("select count(*) from a_success_table where k='testOrderOfMutationsIsPredicatable' or k like 'z%'", // rows left: zz, zzz, checkThatAllStatementTypesMaintainOrderInConnection
-                                                    "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + ROW_TO_FAIL + "'",
-                                                    "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = 'z'"), 
-                                       newArrayList(new Integer(4), new Integer(0), new Integer(1)));
-    }
-    
-    private void testPartialCommit(List<String> statements, int failureCount, int[] expectedUncommittedStatementIndexes, boolean willFail,
-                                   List<String> countStatementsForVerification, List<Integer> expectedCountsForVerification) {
-        Preconditions.checkArgument(countStatementsForVerification.size() == expectedCountsForVerification.size());
-        
-        try (Connection con = getConnectionWithTableOrderPreservingMutationState()) {
-            con.setAutoCommit(false);
-            Statement sta = con.createStatement();
-            for (String statement : statements) {
-                sta.execute(statement);
-            }
-            try {
-                con.commit();
-                if (willFail) {
-                    fail("Expected at least one statement in the list to fail");
-                } else {
-                    assertEquals(0, con.unwrap(PhoenixConnection.class).getStatementExecutionCounter()); // should have been reset to 0 in commit()
-                }
-            } catch (SQLException sqle) {
-                if (!willFail) {
-                    fail("Expected no statements to fail");
-                }
-                assertEquals(CommitException.class, sqle.getClass());
-                int[] uncommittedStatementIndexes = ((CommitException)sqle).getUncommittedStatementIndexes();
-                assertEquals(failureCount, uncommittedStatementIndexes.length);
-                assertArrayEquals(expectedUncommittedStatementIndexes, uncommittedStatementIndexes);
-            }
-            
-            // verify data in HBase
-            for (int i = 0; i < countStatementsForVerification.size(); i++) {
-                String countStatement = countStatementsForVerification.get(i);
-                ResultSet rs = sta.executeQuery(countStatement);
-                if (!rs.next()) {
-                    fail("Expected a single row from count query");
-                }
-                assertEquals(expectedCountsForVerification.get(i).intValue(), rs.getInt(1));
-            }
-        } catch (SQLException e) {
-            fail(e.toString());
-        }
-    }
-    
-    private PhoenixConnection getConnectionWithTableOrderPreservingMutationState() throws SQLException {
-        Connection con = driver.connect(url, new Properties());
-        PhoenixConnection phxCon = new PhoenixConnection(con.unwrap(PhoenixConnection.class));
-        final Map<TableRef,Map<ImmutableBytesPtr,MutationState.RowMutationState>> mutations = Maps.newTreeMap(new TableRefComparator());
-        return new PhoenixConnection(phxCon) {
-            protected MutationState newMutationState(int maxSize) {
-                return new MutationState(maxSize, this, mutations);
-            };
-        };
-    }
-    
-    public static class FailingRegionObserver extends SimpleRegionObserver {
-        @Override
-        public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
-                final Durability durability) throws HBaseIOException {
-            if (shouldFailUpsert(c, put) || shouldFailDelete(c, put)) {
-                // throwing anything other than instances of IOException result
-                // in this coprocessor being unloaded
-                // DoNotRetryIOException tells HBase not to retry this mutation
-                // multiple times
-                throw new DoNotRetryIOException();
-            }
-        }
-        
-        private static boolean shouldFailUpsert(ObserverContext<RegionCoprocessorEnvironment> c, Put put) {
-            String tableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
-            return TABLE_NAME_TO_FAIL.equals(tableName) && Bytes.equals(ROW_TO_FAIL, put.getRow());
-        }
-        
-        private static boolean shouldFailDelete(ObserverContext<RegionCoprocessorEnvironment> c, Put put) {
-            String tableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
-            return TABLE_NAME_TO_FAIL.equals(tableName) &&  
-                   // Phoenix deletes are sent as Puts with empty values
-                   put.getFamilyCellMap().firstEntry().getValue().get(0).getValueLength() == 0; 
-        }
-    }
-    
-    /**
-     * Used for ordering {@link MutationState#mutations} map.
-     */
-    private static class TableRefComparator implements Comparator<TableRef> {
-        @Override
-        public int compare(TableRef tr1, TableRef tr2) {
-            return tr1.getTable().getPhysicalName().getString().compareTo(tr2.getTable().getPhysicalName().getString());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 6f51a4c..322d24a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -39,7 +39,6 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.AggregatePlan;
 import org.apache.phoenix.execute.BaseQueryPlan;
 import org.apache.phoenix.execute.MutationState;
-import org.apache.phoenix.execute.MutationState.RowMutationState;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.IndexMetaDataCacheClient;
@@ -107,8 +106,8 @@ public class DeleteCompiler {
         ConnectionQueryServices services = connection.getQueryServices();
         final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
         final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
-        Map<ImmutableBytesPtr,RowMutationState> mutations = Maps.newHashMapWithExpectedSize(batchSize);
-        Map<ImmutableBytesPtr,RowMutationState> indexMutations = null;
+        Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations = Maps.newHashMapWithExpectedSize(batchSize);
+        Map<ImmutableBytesPtr,Map<PColumn,byte[]>> indexMutations = null;
         // If indexTableRef is set, we're deleting the rows from both the index table and
         // the data table through a single query to save executing an additional one.
         if (indexTableRef != null) {
@@ -148,11 +147,11 @@ public class DeleteCompiler {
                     }
                     table.newKey(ptr, values);
                 }
-                mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter()));
+                mutations.put(ptr, PRow.DELETE_MARKER);
                 if (indexTableRef != null) {
                     ImmutableBytesPtr indexPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map
                     rs.getCurrentRow().getKey(indexPtr);
-                    indexMutations.put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter()));
+                    indexMutations.put(indexPtr, PRow.DELETE_MARKER);
                 }
                 if (mutations.size() > maxSize) {
                     throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize);
@@ -430,9 +429,9 @@ public class DeleteCompiler {
                         // keys for our ranges
                         ScanRanges ranges = context.getScanRanges();
                         Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator(); 
-                        Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount());
+                        Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount());
                         while (iterator.hasNext()) {
-                            mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter()));
+                            mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), PRow.DELETE_MARKER);
                         }
                         return new MutationState(tableRef, mutation, 0, maxSize, connection);
                     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index f172814..b21cc2f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -42,7 +42,6 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.AggregatePlan;
 import org.apache.phoenix.execute.MutationState;
-import org.apache.phoenix.execute.MutationState.RowMutationState;
 import org.apache.phoenix.expression.Determinism;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
@@ -96,7 +95,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 public class UpsertCompiler {
-    private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,RowMutationState> mutation, PhoenixStatement statement) {
+    private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutation) {
         Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length);
         byte[][] pkValues = new byte[table.getPKColumns().size()][];
         // If the table uses salting, the first byte is the salting byte, set to an empty array
@@ -115,7 +114,7 @@ public class UpsertCompiler {
         }
         ImmutableBytesPtr ptr = new ImmutableBytesPtr();
         table.newKey(ptr, pkValues);
-        mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter()));
+        mutation.put(ptr, columnValues);
     }
 
     private static MutationState upsertSelect(PhoenixStatement statement, 
@@ -129,7 +128,7 @@ public class UpsertCompiler {
             boolean isAutoCommit = connection.getAutoCommit();
             byte[][] values = new byte[columnIndexes.length][];
             int rowCount = 0;
-            Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize);
+            Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutation = Maps.newHashMapWithExpectedSize(batchSize);
             PTable table = tableRef.getTable();
             ResultSet rs = new PhoenixResultSet(iterator, projector, statement);
             ImmutableBytesWritable ptr = new ImmutableBytesWritable();
@@ -157,7 +156,7 @@ public class UpsertCompiler {
                             column.getMaxLength(), column.getScale(), column.getSortOrder());
                     values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr);
                 }
-                setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement);
+                setValues(values, pkSlotIndexes, columnIndexes, table, mutation);
                 rowCount++;
                 // Commit a batch if auto commit is true and we're at our batch size
                 if (isAutoCommit && rowCount % batchSize == 0) {
@@ -803,8 +802,8 @@ public class UpsertCompiler {
                         throw new IllegalStateException();
                     }
                 }
-                Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(1);
-                setValues(values, pkSlotIndexes, columnIndexes, tableRef.getTable(), mutation, statement);
+                Map<ImmutableBytesPtr, Map<PColumn, byte[]>> mutation = Maps.newHashMapWithExpectedSize(1);
+                setValues(values, pkSlotIndexes, columnIndexes, tableRef.getTable(), mutation);
                 return new MutationState(tableRef, mutation, 0, maxSize, connection);
             }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
index a9d8311..63bf6a1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
@@ -19,32 +19,23 @@ package org.apache.phoenix.execute;
 
 import java.sql.SQLException;
 
-import org.apache.phoenix.jdbc.PhoenixConnection;
-
 public class CommitException extends SQLException {
-    private static final long serialVersionUID = 2L;
-    private final int[] uncommittedStatementIndexes;
+    private static final long serialVersionUID = 1L;
+    private final MutationState uncommittedState;
+    private final MutationState committedState;
 
-    public CommitException(Exception e, int[] uncommittedStatementIndexes) {
+    public CommitException(Exception e, MutationState uncommittedState, MutationState committedState) {
         super(e);
-        this.uncommittedStatementIndexes = uncommittedStatementIndexes;
+        this.uncommittedState = uncommittedState;
+        this.committedState = committedState;
+    }
+
+    public MutationState getUncommittedState() {
+        return uncommittedState;
     }
 
-    /**
-     * Returns indexes of UPSERT and DELETE statements that have failed. Indexes returned
-     * correspond to each failed statement's order of creation within a {@link PhoenixConnection} up to
-     * commit/rollback.
-     * <p>
-     * Statements whose index is returned in this set correspond to one or more HBase mutations that have failed.
-     * <p>
-     * Statement indexes are maintained correctly for connections that mutate and query 
-     * <b>data</b> (DELETE, UPSERT and SELECT) only. Statement (and their subsequent failure) order
-     * is undefined for connections that execute metadata operations due to the fact that Phoenix rolls
-     * back connections after metadata mutations.
-     * 
-     * @see PhoenixConnection#getStatementExecutionCounter()
-     */
-    public int[] getUncommittedStatementIndexes() {
-    	return uncommittedStatementIndexes;
+    public MutationState getCommittedState() {
+        return committedState;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 8053f15..04626a6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.execute;
 
 import java.io.IOException;
 import java.sql.SQLException;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -62,11 +61,9 @@ import org.cloudera.htrace.TraceScope;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.sun.istack.NotNull;
 
 /**
  * 
@@ -81,32 +78,40 @@ public class MutationState implements SQLCloseable {
     private PhoenixConnection connection;
     private final long maxSize;
     private final ImmutableBytesPtr tempPtr = new ImmutableBytesPtr();
-    private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations;
+    private final Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> mutations = Maps.newHashMapWithExpectedSize(3); // TODO: Sizing?
     private long sizeOffset;
     private int numRows = 0;
 
-    MutationState(long maxSize, PhoenixConnection connection, Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations) {
-        this.maxSize = maxSize;
-        this.connection = connection;
-        this.mutations = mutations;
-    }
-
-    public MutationState(long maxSize, PhoenixConnection connection) {
+    public MutationState(int maxSize, PhoenixConnection connection) {
         this(maxSize,connection,0);
     }
     
-    public MutationState(long maxSize, PhoenixConnection connection, long sizeOffset) {
-        this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize()));
+    public MutationState(int maxSize, PhoenixConnection connection, long sizeOffset) {
+        this.maxSize = maxSize;
+        this.connection = connection;
         this.sizeOffset = sizeOffset;
     }
     
-    public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) {
-        this(maxSize, connection, sizeOffset);
+    public MutationState(TableRef table, Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) {
+        this.maxSize = maxSize;
+        this.connection = connection;
         this.mutations.put(table, mutations);
+        this.sizeOffset = sizeOffset;
         this.numRows = mutations.size();
         throwIfTooBig();
     }
     
+    private MutationState(List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> entries, long sizeOffset, long maxSize, PhoenixConnection connection) {
+        this.maxSize = maxSize;
+        this.connection = connection;
+        this.sizeOffset = sizeOffset;
+        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : entries) {
+            numRows += entry.getValue().size();
+            this.mutations.put(entry.getKey(), entry.getValue());
+        }
+        throwIfTooBig();
+    }
+    
     private void throwIfTooBig() {
         if (numRows > maxSize) {
             // TODO: throw SQLException ?
@@ -129,28 +134,29 @@ public class MutationState implements SQLCloseable {
         }
         this.sizeOffset += newMutation.sizeOffset;
         // Merge newMutation with this one, keeping state from newMutation for any overlaps
-        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : newMutation.mutations.entrySet()) {
+        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : newMutation.mutations.entrySet()) {
             // Replace existing entries for the table with new entries
             TableRef tableRef = entry.getKey();
             PTable table = tableRef.getTable();
             boolean isIndex = table.getType() == PTableType.INDEX;
-            Map<ImmutableBytesPtr,RowMutationState> existingRows = this.mutations.put(tableRef, entry.getValue());
+            Map<ImmutableBytesPtr,Map<PColumn,byte[]>> existingRows = this.mutations.put(tableRef, entry.getValue());
             if (existingRows != null) { // Rows for that table already exist
                 // Loop through new rows and replace existing with new
-                for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : entry.getValue().entrySet()) {
+                for (Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry : entry.getValue().entrySet()) {
                     // Replace existing row with new row
-                	RowMutationState existingRowMutationState = existingRows.put(rowEntry.getKey(), rowEntry.getValue());
-                    if (existingRowMutationState != null) {
-                    	Map<PColumn,byte[]> existingValues = existingRowMutationState.getColumnValues();
+                    Map<PColumn,byte[]> existingValues = existingRows.put(rowEntry.getKey(), rowEntry.getValue());
+                    if (existingValues != null) {
                         if (existingValues != PRow.DELETE_MARKER) {
-                            Map<PColumn,byte[]> newRow = rowEntry.getValue().getColumnValues();
+                            Map<PColumn,byte[]> newRow = rowEntry.getValue();
                             // if new row is PRow.DELETE_MARKER, it means delete, and we don't need to merge it with existing row. 
                             if (newRow != PRow.DELETE_MARKER) {
-                                // Merge existing column values with new column values
-                                existingRowMutationState.join(rowEntry.getValue());
+                                // Replace existing column values with new column values
+                                for (Map.Entry<PColumn,byte[]> valueEntry : newRow.entrySet()) {
+                                    existingValues.put(valueEntry.getKey(), valueEntry.getValue());
+                                }
                                 // Now that the existing row has been merged with the new row, replace it back
-                                // again (since it was merged with the new one above).
-                                existingRows.put(rowEntry.getKey(), existingRowMutationState);
+                                // again (since it was replaced with the new one above).
+                                existingRows.put(rowEntry.getKey(), existingValues);
                             }
                         }
                     } else {
@@ -170,16 +176,16 @@ public class MutationState implements SQLCloseable {
         throwIfTooBig();
     }
     
-    private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values, long timestamp, boolean includeMutableIndexes) {
+    private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, Map<PColumn, byte[]>> values, long timestamp, boolean includeMutableIndexes) {
         final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism
                 (tableRef.getTable().isImmutableRows() || includeMutableIndexes) ? 
                         IndexMaintainer.nonDisabledIndexIterator(tableRef.getTable().getIndexes().iterator()) : 
                         Iterators.<PTable>emptyIterator();
         final List<Mutation> mutations = Lists.newArrayListWithExpectedSize(values.size());
         final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists.<Mutation>newArrayListWithExpectedSize(values.size()) : null;
-        Iterator<Map.Entry<ImmutableBytesPtr,RowMutationState>> iterator = values.entrySet().iterator();
+        Iterator<Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>>> iterator = values.entrySet().iterator();
         while (iterator.hasNext()) {
-            Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry = iterator.next();
+            Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry = iterator.next();
             ImmutableBytesPtr key = rowEntry.getKey();
             PRow row = tableRef.getTable().newRow(connection.getKeyValueBuilder(), timestamp, key);
             List<Mutation> rowMutations, rowMutationsPertainingToIndex;
@@ -191,7 +197,7 @@ public class MutationState implements SQLCloseable {
                 // delete rows).
                 rowMutationsPertainingToIndex = Collections.emptyList();
             } else {
-                for (Map.Entry<PColumn,byte[]> valueEntry : rowEntry.getValue().getColumnValues().entrySet()) {
+                for (Map.Entry<PColumn,byte[]> valueEntry : rowEntry.getValue().entrySet()) {
                     row.setValue(valueEntry.getKey(), valueEntry.getValue());
                 }
                 rowMutations = row.toRowMutations();
@@ -243,14 +249,14 @@ public class MutationState implements SQLCloseable {
     }
     
     public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean includeMutableIndexes) {
-        final Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator();
+        final Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> iterator = this.mutations.entrySet().iterator();
         if (!iterator.hasNext()) {
             return Iterators.emptyIterator();
         }
         Long scn = connection.getSCN();
         final long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
         return new Iterator<Pair<byte[],List<Mutation>>>() {
-            private Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> current = iterator.next();
+            private Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> current = iterator.next();
             private Iterator<Pair<byte[],List<Mutation>>> innerIterator = init();
                     
             private Iterator<Pair<byte[],List<Mutation>>> init() {
@@ -291,7 +297,7 @@ public class MutationState implements SQLCloseable {
         Long scn = connection.getSCN();
         MetaDataClient client = new MetaDataClient(connection);
         long[] timeStamps = new long[this.mutations.size()];
-        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : mutations.entrySet()) {
+        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : mutations.entrySet()) {
             TableRef tableRef = entry.getKey();
             long serverTimeStamp = tableRef.getTimeStamp();
             PTable table = tableRef.getTable();
@@ -306,15 +312,12 @@ public class MutationState implements SQLCloseable {
                         // TODO: use bitset?
                         table = result.getTable();
                         PColumn[] columns = new PColumn[table.getColumns().size()];
-                        for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : entry.getValue().entrySet()) {
-                        	RowMutationState valueEntry = rowEntry.getValue();
-                            if (valueEntry != null) {
-                            	Map<PColumn, byte[]> colValues = valueEntry.getColumnValues();
-                            	if (colValues != PRow.DELETE_MARKER) {
-	                                for (PColumn column : colValues.keySet()) {
-	                                    columns[column.getPosition()] = column;
-	                                }
-                            	}
+                        for (Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry : entry.getValue().entrySet()) {
+                            Map<PColumn,byte[]> valueEntry = rowEntry.getValue();
+                            if (valueEntry != PRow.DELETE_MARKER) {
+                                for (PColumn column : valueEntry.keySet()) {
+                                    columns[column.getPosition()] = column;
+                                }
                             }
                         }
                         for (PColumn column : columns) {
@@ -354,14 +357,15 @@ public class MutationState implements SQLCloseable {
         int i = 0;
         byte[] tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getBytes();
         long[] serverTimeStamps = validate();
-        Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator();
+        Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> iterator = this.mutations.entrySet().iterator();
+        List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> committedList = Lists.newArrayListWithCapacity(this.mutations.size());
 
         // add tracing for this operation
         TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables");
         Span span = trace.getSpan();
         while (iterator.hasNext()) {
-            Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry = iterator.next();
-            Map<ImmutableBytesPtr,RowMutationState> valuesMap = entry.getValue();
+            Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry = iterator.next();
+            Map<ImmutableBytesPtr,Map<PColumn,byte[]>> valuesMap = entry.getValue();
             TableRef tableRef = entry.getKey();
             PTable table = tableRef.getTable();
             table.getIndexMaintainers(tempPtr, connection);
@@ -421,6 +425,7 @@ public class MutationState implements SQLCloseable {
                         child.stop();
                         shouldRetry = false;
                         if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Total time for batch call of  " + mutations.size() + " mutations into " + table.getName().getString() + ": " + (System.currentTimeMillis() - startTime) + " ms", connection));
+                        committedList.add(entry);
                     } catch (Exception e) {
                         SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e);
                         if (inferredE != null) {
@@ -441,7 +446,9 @@ public class MutationState implements SQLCloseable {
                             }
                             e = inferredE;
                         }
-                        sqlE = new CommitException(e, getUncommittedSattementIndexes());
+                        // Throw to client with both what was committed so far and what is left to be committed.
+                        // That way, client can either undo what was done or try again with what was not done.
+                        sqlE = new CommitException(e, this, new MutationState(committedList, this.sizeOffset, this.maxSize, this.connection));
                     } finally {
                         try {
                             hTable.close();
@@ -481,64 +488,7 @@ public class MutationState implements SQLCloseable {
         numRows = 0;
     }
     
-    private int[] getUncommittedSattementIndexes() {
-    	int[] result = new int[0];
-    	for (Map<ImmutableBytesPtr, RowMutationState> rowMutations : mutations.values()) {
-    		for (RowMutationState rowMutationState : rowMutations.values()) {
-    			result = joinSortedIntArrays(result, rowMutationState.getStatementIndexes());
-    		}
-    	}
-    	return result;
-    }
-    
     @Override
     public void close() throws SQLException {
     }
-    
-    public static int[] joinSortedIntArrays(int[] a, int[] b) {
-        int[] result = new int[a.length + b.length];
-        int i = 0, j = 0, k = 0, current;
-        while (i < a.length && j < b.length) {
-            current = a[i] < b[j] ? a[i++] : b[j++];
-            for ( ; i < a.length && a[i] == current; i++);
-            for ( ; j < b.length && b[j] == current; j++);
-            result[k++] = current;
-        }
-        while (i < a.length) {
-            for (current = a[i++] ; i < a.length && a[i] == current; i++);
-            result[k++] = current;
-        }
-        while (j < b.length) {
-            for (current = b[j++] ; j < b.length && b[j] == current; j++);
-            result[k++] = current;
-        }
-        return Arrays.copyOf(result, k);
-    }
-    
-    public static class RowMutationState {
-        private Map<PColumn,byte[]> columnValues;
-        private int[] statementIndexes;
-
-        public RowMutationState(@NotNull Map<PColumn,byte[]> columnValues, int statementIndex) {
-            Preconditions.checkNotNull(columnValues);
-
-            this.columnValues = columnValues;
-            this.statementIndexes = new int[] {statementIndex};
-        }
-
-        Map<PColumn, byte[]> getColumnValues() {
-            return columnValues;
-        }
-
-        int[] getStatementIndexes() {
-            return statementIndexes;
-        }
-        
-        void join(RowMutationState newRow) {
-            getColumnValues().putAll(newRow.getColumnValues());
-            statementIndexes = joinSortedIntArrays(statementIndexes, newRow.getStatementIndexes());
-        }
-        
-
-    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index c9ac94a..de9e323 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.phoenix.call.CallRunner;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
-import org.apache.phoenix.execute.CommitException;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.expression.function.FunctionArgumentType;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
@@ -122,21 +121,21 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
     private final Properties info;
     private List<SQLCloseable> statements = new ArrayList<SQLCloseable>();
     private final Map<PDataType<?>, Format> formatters = new HashMap<>();
-    private MutationState mutationState;
+    private final MutationState mutationState;
     private final int mutateBatchSize;
     private final Long scn;
     private boolean isAutoCommit = false;
     private PMetaData metaData;
     private final PName tenantId;
-    private final String datePattern; 
+    private final String datePattern;
     private final String timePattern;
     private final String timestampPattern;
-    private int statementExecutionCounter;
+    
     private boolean isClosed = false;
     private Sampler<?> sampler;
     private boolean readOnly = false;
-    private Map<String, String> customTracingAnnotations = emptyMap();
-    
+    private Map<String, String> customTracingAnnotations = emptyMap(); 
+ 
     static {
         Tracing.addTraceMetricsSource();
     }
@@ -151,20 +150,17 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
         this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.getMetaDataCache());
         this.isAutoCommit = connection.isAutoCommit;
         this.sampler = connection.sampler;
-        this.statementExecutionCounter = connection.statementExecutionCounter;
     }
     
     public PhoenixConnection(PhoenixConnection connection, long scn) throws SQLException {
         this(connection.getQueryServices(), connection, scn);
         this.sampler = connection.sampler;
-        this.statementExecutionCounter = connection.statementExecutionCounter;
     }
     
     public PhoenixConnection(ConnectionQueryServices services, PhoenixConnection connection, long scn) throws SQLException {
         this(services, connection.getURL(), newPropsWithSCN(scn,connection.getClientInfo()), connection.getMetaDataCache());
         this.isAutoCommit = connection.isAutoCommit;
         this.sampler = connection.sampler;
-        this.statementExecutionCounter = connection.statementExecutionCounter;
     }
     
     public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData) throws SQLException {
@@ -237,7 +233,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
             }
             
         });
-        this.mutationState = newMutationState(maxSize);
+        this.mutationState = new MutationState(maxSize, this);
         this.services.addConnection(this);
 
         // setup tracing, if its enabled
@@ -365,10 +361,6 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
         return metaData;
     }
 
-    protected MutationState newMutationState(int maxSize) {
-        return new MutationState(maxSize, this); 
-    }
-    
     public MutationState getMutationState() {
         return mutationState;
     }
@@ -434,7 +426,6 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
                 return null;
             }
         }, Tracing.withTracing(this, "committing mutations"));
-        statementExecutionCounter = 0;
     }
 
     @Override
@@ -635,7 +626,6 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
     @Override
     public void rollback() throws SQLException {
         mutationState.rollback(this);
-        statementExecutionCounter = 0;
     }
 
     @Override
@@ -786,19 +776,4 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
     public KeyValueBuilder getKeyValueBuilder() {
         return this.services.getKeyValueBuilder();
     }
-    
-    /**
-     * Used to track executions of {@link Statement}s and {@link PreparedStatement}s that were created from this connection before
-     * commit or rollback. 0-based. Used to associate partial save errors with SQL statements
-     * invoked by users.
-     * @see CommitException
-     * @see #incrementStatementExecutionCounter()
-     */
-    public int getStatementExecutionCounter() {
-		return statementExecutionCounter;
-	}
-    
-    public void incrementStatementExecutionCounter() {
-        statementExecutionCounter++;
-    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
index a23484c..25be8c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
@@ -54,8 +54,8 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.schema.ExecuteQueryNotApplicableException;
 import org.apache.phoenix.schema.ExecuteUpdateNotApplicableException;
-import org.apache.phoenix.schema.Sequence;
 import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.Sequence;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.SQLCloseable;
 
@@ -79,7 +79,8 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar
 
     private final String query;
 
-    public PhoenixPreparedStatement(PhoenixConnection connection, PhoenixStatementParser parser) throws SQLException, IOException {
+    public PhoenixPreparedStatement(PhoenixConnection connection, PhoenixStatementParser parser) throws SQLException,
+            IOException {
         super(connection);
         this.statement = parser.nextStatement(new ExecutableNodeFactory());
         if (this.statement == null) { throw new EOFException(); }
@@ -88,7 +89,7 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar
         this.parameters = Arrays.asList(new Object[statement.getBindCount()]);
         Collections.fill(parameters, BindManager.UNBOUND_PARAMETER);
     }
-    
+
     public PhoenixPreparedStatement(PhoenixConnection connection, String query) throws SQLException {
         super(connection);
         this.query = query;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index c6d086a..4ca5bb5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -151,7 +151,6 @@ import com.google.common.collect.Lists;
  * @since 0.1
  */
 public class PhoenixStatement implements Statement, SQLCloseable, org.apache.phoenix.jdbc.Jdbc7Shim.Statement {
-	
     private static final Logger logger = LoggerFactory.getLogger(PhoenixStatement.class);
     
     public enum Operation {
@@ -244,7 +243,6 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
                         setLastResultSet(rs);
                         setLastUpdateCount(NO_UPDATE);
                         setLastUpdateOperation(stmt.getOperation());
-                        connection.incrementStatementExecutionCounter();
                         return rs;
                     } catch (RuntimeException e) {
                         // FIXME: Expression.evaluate does not throw SQLException
@@ -291,7 +289,6 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
                                 int lastUpdateCount = (int) Math.min(Integer.MAX_VALUE, state.getUpdateCount());
                                 setLastUpdateCount(lastUpdateCount);
                                 setLastUpdateOperation(stmt.getOperation());
-                                connection.incrementStatementExecutionCounter();
                                 return lastUpdateCount;
                             } catch (RuntimeException e) {
                                 // FIXME: Expression.evaluate does not throw SQLException

http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
deleted file mode 100644
index 67c3353..0000000
--- a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Copyright 2010 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you maynot use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicablelaw or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.execute;
-
-import static org.apache.phoenix.execute.MutationState.joinSortedIntArrays;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-
-public class MutationStateTest {
-
-    @Test
-    public void testJoinIntArrays() {
-        // simple case
-        int[] a = new int[] {1};
-        int[] b = new int[] {2};
-        int[] result = joinSortedIntArrays(a, b);
-        
-        assertEquals(2, result.length);
-        assertArrayEquals(new int[] {1,2}, result);
-        
-        // empty arrays
-        a = new int[0];
-        b = new int[0];
-        result = joinSortedIntArrays(a, b);
-        
-        assertEquals(0, result.length);
-        assertArrayEquals(new int[] {}, result);
-        
-        // dupes between arrays
-        a = new int[] {1,2,3};
-        b = new int[] {1,2,4};
-        result = joinSortedIntArrays(a, b);
-        
-        assertEquals(4, result.length);
-        assertArrayEquals(new int[] {1,2,3,4}, result);
-        
-        // dupes within arrays
-        a = new int[] {1,2,2,3};
-        b = new int[] {1,2,4};
-        result = joinSortedIntArrays(a, b);
-        
-        assertEquals(4, result.length);
-        assertArrayEquals(new int[] {1,2,3,4}, result);
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/569469a4/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index b64eff8..9947440 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -655,7 +655,7 @@ public abstract class BaseTest {
      * Create a {@link PhoenixTestDriver} and register it.
      * @return an initialized and registered {@link PhoenixTestDriver} 
      */
-    public static PhoenixTestDriver initAndRegisterDriver(String url, ReadOnlyProps props) throws Exception {
+    protected static PhoenixTestDriver initAndRegisterDriver(String url, ReadOnlyProps props) throws Exception {
         PhoenixTestDriver newDriver = new PhoenixTestDriver(props);
         DriverManager.registerDriver(newDriver);
         Driver oldDriver = DriverManager.getDriver(url);