You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by el...@apache.org on 2015/04/22 02:50:44 UTC

phoenix git commit: PHOENIX-900 Partial results for mutations

Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 9b7fefe28 -> 97f28b1cc


PHOENIX-900 Partial results for mutations


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/97f28b1c
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/97f28b1c
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/97f28b1c

Branch: refs/heads/4.x-HBase-0.98
Commit: 97f28b1cc3cffc24f9064874ffe19df5564962cf
Parents: 9b7fefe
Author: Eli Levine <el...@apache.org>
Authored: Tue Apr 21 17:49:30 2015 -0700
Committer: Eli Levine <el...@apache.org>
Committed: Tue Apr 21 17:49:30 2015 -0700

----------------------------------------------------------------------
 .../apache/phoenix/execute/PartialCommitIT.java | 302 +++++++++++++++++++
 .../apache/phoenix/compile/DeleteCompiler.java  |  13 +-
 .../apache/phoenix/compile/UpsertCompiler.java  |  13 +-
 .../apache/phoenix/execute/CommitException.java |  35 ++-
 .../apache/phoenix/execute/MutationState.java   | 158 ++++++----
 .../apache/phoenix/jdbc/PhoenixConnection.java  |  33 +-
 .../phoenix/jdbc/PhoenixPreparedStatement.java  |   7 +-
 .../apache/phoenix/jdbc/PhoenixStatement.java   |   3 +
 .../phoenix/execute/MutationStateTest.java      |  64 ++++
 .../java/org/apache/phoenix/query/BaseTest.java |   2 +-
 10 files changed, 542 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/97f28b1c/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
new file mode 100644
index 0000000..550d7de
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
@@ -0,0 +1,302 @@
+/*
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Sets.newHashSet;
+import static java.util.Collections.singletonList;
+import static org.apache.phoenix.query.BaseTest.initAndRegisterDriver;
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
+import static org.apache.phoenix.util.TestUtil.LOCALHOST;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class PartialCommitIT {
+    
+    private static final String TABLE_NAME_TO_FAIL = "b_failure_table".toUpperCase();
+    private static final byte[] ROW_TO_FAIL = Bytes.toBytes("fail me");
+    private static final String UPSERT_TO_FAIL = "upsert into " + TABLE_NAME_TO_FAIL + " values ('" + Bytes.toString(ROW_TO_FAIL) + "', 'boom!')";
+    private static final String UPSERT_SELECT_TO_FAIL = "upsert into " + TABLE_NAME_TO_FAIL + " select k, c from a_success_table";
+    private static final String DELETE_TO_FAIL = "delete from " + TABLE_NAME_TO_FAIL + "  where k='z'";
+    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+    private static String url;
+    private static Driver driver;
+    private static final Properties props = new Properties();
+    
+    static {
+        props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 10);
+    }
+    
+    @BeforeClass
+    public static void setupCluster() throws Exception {
+      Configuration conf = TEST_UTIL.getConfiguration();
+      setUpConfigForMiniCluster(conf);
+      conf.setClass("hbase.coprocessor.region.classes", FailingRegionObserver.class, RegionObserver.class);
+      conf.setBoolean("hbase.coprocessor.abortonerror", false);
+      conf.setBoolean(Indexer.CHECK_VERSION_CONF_KEY, false);
+      TEST_UTIL.startMiniCluster();
+      String clientPort = TEST_UTIL.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
+      url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR + clientPort
+              + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
+
+      Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+      // Must update config before starting server
+      props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
+      driver = initAndRegisterDriver(url, new ReadOnlyProps(props.entrySet().iterator()));
+      createTablesWithABitOfData();
+    }
+    
+    private static void createTablesWithABitOfData() throws Exception {
+        Properties props = new Properties();
+        props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 10);
+
+        try (Connection con = driver.connect(url, new Properties())) {
+            Statement sta = con.createStatement();
+            sta.execute("create table a_success_table (k varchar primary key, c varchar)");
+            sta.execute("create table b_failure_table (k varchar primary key, c varchar)");
+            sta.execute("create table c_success_table (k varchar primary key, c varchar)");
+            con.commit();
+        }
+
+        props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 100);
+
+        try (Connection con = driver.connect(url, new Properties())) {
+            con.setAutoCommit(false);
+            Statement sta = con.createStatement();
+            for (String table : newHashSet("a_success_table", TABLE_NAME_TO_FAIL, "c_success_table")) {
+                sta.execute("upsert into " + table + " values ('z', 'z')");
+                sta.execute("upsert into " + table + " values ('zz', 'zz')");
+                sta.execute("upsert into " + table + " values ('zzz', 'zzz')");
+            }
+            con.commit();
+        }
+    }
+    
+    @AfterClass
+    public static void teardownCluster() throws Exception {
+      TEST_UTIL.shutdownMiniCluster();
+    }
+    
+    @Test
+    public void testNoFailure() {
+        testPartialCommit(singletonList("upsert into a_success_table values ('testNoFailure', 'a')"), 0, new int[0], false,
+                                        singletonList("select count(*) from a_success_table where k='testNoFailure'"), singletonList(new Integer(1)));
+    }
+    
+    @Test
+    public void testUpsertFailure() {
+        testPartialCommit(newArrayList("upsert into a_success_table values ('testUpsertFailure1', 'a')", 
+                                       UPSERT_TO_FAIL, 
+                                       "upsert into a_success_table values ('testUpsertFailure2', 'b')"), 
+                                       1, new int[]{1}, true,
+                                       newArrayList("select count(*) from a_success_table where k like 'testUpsertFailure_'",
+                                                    "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), 
+                                       newArrayList(new Integer(2), new Integer(0)));
+    }
+    
+    @Test
+    public void testUpsertSelectFailure() throws SQLException {
+        props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 100);
+
+        try (Connection con = driver.connect(url, new Properties())) {
+            con.createStatement().execute("upsert into a_success_table values ('" + Bytes.toString(ROW_TO_FAIL) + "', 'boom!')");
+            con.commit();
+        }
+        
+        testPartialCommit(newArrayList("upsert into a_success_table values ('testUpsertSelectFailure', 'a')", 
+                                       UPSERT_SELECT_TO_FAIL), 
+                                       1, new int[]{1}, true,
+                                       newArrayList("select count(*) from a_success_table where k in ('testUpsertSelectFailure', '" + Bytes.toString(ROW_TO_FAIL) + "')",
+                                                    "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), 
+                                       newArrayList(new Integer(2), new Integer(0)));
+    }
+    
+    @Test
+    public void testDeleteFailure() {
+        testPartialCommit(newArrayList("upsert into a_success_table values ('testDeleteFailure1', 'a')", 
+                                       DELETE_TO_FAIL,
+                                       "upsert into a_success_table values ('testDeleteFailure2', 'b')"), 
+                                       1, new int[]{1}, true,
+                                       newArrayList("select count(*) from a_success_table where k like 'testDeleteFailure_'",
+                                                    "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = 'z'"), 
+                                       newArrayList(new Integer(2), new Integer(1)));
+    }
+    
+    /**
+     * {@link MutationState} keeps mutations ordered lexicographically by table name.
+     */
+    @Test
+    public void testOrderOfMutationsIsPredicatable() {
+        testPartialCommit(newArrayList("upsert into c_success_table values ('testOrderOfMutationsIsPredicatable', 'c')", // will fail because c_success_table is after b_failure_table by table sort order
+                                       UPSERT_TO_FAIL, 
+                                       "upsert into a_success_table values ('testOrderOfMutationsIsPredicatable', 'a')"), // will succeed because a_success_table is before b_failure_table by table sort order
+                                       2, new int[]{0,1}, true,
+                                       newArrayList("select count(*) from c_success_table where k='testOrderOfMutationsIsPredicatable'",
+                                                    "select count(*) from a_success_table where k='testOrderOfMutationsIsPredicatable'",
+                                                    "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), 
+                                       newArrayList(new Integer(0), new Integer(1), new Integer(0)));
+    }
+    
+    @Test
+    public void checkThatAllStatementTypesMaintainOrderInConnection() {
+        testPartialCommit(newArrayList("upsert into a_success_table values ('k', 'checkThatAllStatementTypesMaintainOrderInConnection')", 
+                                       "upsert into a_success_table select k, c from c_success_table",
+                                       DELETE_TO_FAIL,
+                                       "select * from a_success_table", 
+                                       UPSERT_TO_FAIL), 
+                                       2, new int[]{2,4}, true,
+                                       newArrayList("select count(*) from a_success_table where k='testOrderOfMutationsIsPredicatable' or k like 'z%'", // rows left: zz, zzz, checkThatAllStatementTypesMaintainOrderInConnection
+                                                    "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + ROW_TO_FAIL + "'",
+                                                    "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = 'z'"), 
+                                       newArrayList(new Integer(4), new Integer(0), new Integer(1)));
+    }
+    
+    private void testPartialCommit(List<String> statements, int failureCount, int[] expectedUncommittedStatementIndexes, boolean willFail,
+                                   List<String> countStatementsForVerification, List<Integer> expectedCountsForVerification) {
+        Preconditions.checkArgument(countStatementsForVerification.size() == expectedCountsForVerification.size());
+        
+        try (Connection con = getConnectionWithTableOrderPreservingMutationState()) {
+            con.setAutoCommit(false);
+            Statement sta = con.createStatement();
+            for (String statement : statements) {
+                sta.execute(statement);
+            }
+            try {
+                con.commit();
+                if (willFail) {
+                    fail("Expected at least one statement in the list to fail");
+                } else {
+                    assertEquals(0, con.unwrap(PhoenixConnection.class).getStatementExecutionCounter()); // should have been reset to 0 in commit()
+                }
+            } catch (SQLException sqle) {
+                if (!willFail) {
+                    fail("Expected no statements to fail");
+                }
+                assertEquals(CommitException.class, sqle.getClass());
+                int[] uncommittedStatementIndexes = ((CommitException)sqle).getUncommittedStatementIndexes();
+                assertEquals(failureCount, uncommittedStatementIndexes.length);
+                assertArrayEquals(expectedUncommittedStatementIndexes, uncommittedStatementIndexes);
+            }
+            
+            // verify data in HBase
+            for (int i = 0; i < countStatementsForVerification.size(); i++) {
+                String countStatement = countStatementsForVerification.get(i);
+                ResultSet rs = sta.executeQuery(countStatement);
+                if (!rs.next()) {
+                    fail("Expected a single row from count query");
+                }
+                assertEquals(expectedCountsForVerification.get(i).intValue(), rs.getInt(1));
+            }
+        } catch (SQLException e) {
+            fail(e.toString());
+        }
+    }
+    
+    private PhoenixConnection getConnectionWithTableOrderPreservingMutationState() throws SQLException {
+        Connection con = driver.connect(url, new Properties());
+        PhoenixConnection phxCon = new PhoenixConnection(con.unwrap(PhoenixConnection.class));
+        final Map<TableRef,Map<ImmutableBytesPtr,MutationState.RowMutationState>> mutations = Maps.newTreeMap(new TableRefComparator());
+        return new PhoenixConnection(phxCon) {
+            protected MutationState newMutationState(int maxSize) {
+                return new MutationState(maxSize, this, mutations);
+            };
+        };
+    }
+    
+    public static class FailingRegionObserver extends SimpleRegionObserver {
+        @Override
+        public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
+                final Durability durability) throws HBaseIOException {
+            if (shouldFailUpsert(c, put) || shouldFailDelete(c, put)) {
+                // throwing anything other than instances of IOException result
+                // in this coprocessor being unloaded
+                // DoNotRetryIOException tells HBase not to retry this mutation
+                // multiple times
+                throw new DoNotRetryIOException();
+            }
+        }
+        
+        private static boolean shouldFailUpsert(ObserverContext<RegionCoprocessorEnvironment> c, Put put) {
+            String tableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
+            return TABLE_NAME_TO_FAIL.equals(tableName) && Bytes.equals(ROW_TO_FAIL, put.getRow());
+        }
+        
+        private static boolean shouldFailDelete(ObserverContext<RegionCoprocessorEnvironment> c, Put put) {
+            String tableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
+            return TABLE_NAME_TO_FAIL.equals(tableName) &&  
+                   // Phoenix deletes are sent as Puts with empty values
+                   put.getFamilyCellMap().firstEntry().getValue().get(0).getValueLength() == 0; 
+        }
+    }
+    
+    /**
+     * Used for ordering {@link MutationState#mutations} map.
+     */
+    private static class TableRefComparator implements Comparator<TableRef> {
+        @Override
+        public int compare(TableRef tr1, TableRef tr2) {
+            return tr1.getTable().getPhysicalName().getString().compareTo(tr2.getTable().getPhysicalName().getString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/97f28b1c/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index b8e68f9..4f6a719 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -39,6 +39,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.AggregatePlan;
 import org.apache.phoenix.execute.BaseQueryPlan;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.execute.MutationState.RowMutationState;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.IndexMetaDataCacheClient;
@@ -106,8 +107,8 @@ public class DeleteCompiler {
         ConnectionQueryServices services = connection.getQueryServices();
         final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
         final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
-        Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations = Maps.newHashMapWithExpectedSize(batchSize);
-        Map<ImmutableBytesPtr,Map<PColumn,byte[]>> indexMutations = null;
+        Map<ImmutableBytesPtr,RowMutationState> mutations = Maps.newHashMapWithExpectedSize(batchSize);
+        Map<ImmutableBytesPtr,RowMutationState> indexMutations = null;
         // If indexTableRef is set, we're deleting the rows from both the index table and
         // the data table through a single query to save executing an additional one.
         if (indexTableRef != null) {
@@ -147,11 +148,11 @@ public class DeleteCompiler {
                     }
                     table.newKey(ptr, values);
                 }
-                mutations.put(ptr, PRow.DELETE_MARKER);
+                mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter()));
                 if (indexTableRef != null) {
                     ImmutableBytesPtr indexPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map
                     rs.getCurrentRow().getKey(indexPtr);
-                    indexMutations.put(indexPtr, PRow.DELETE_MARKER);
+                    indexMutations.put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter()));
                 }
                 if (mutations.size() > maxSize) {
                     throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize);
@@ -429,9 +430,9 @@ public class DeleteCompiler {
                         // keys for our ranges
                         ScanRanges ranges = context.getScanRanges();
                         Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator(); 
-                        Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount());
+                        Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount());
                         while (iterator.hasNext()) {
-                            mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), PRow.DELETE_MARKER);
+                            mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter()));
                         }
                         return new MutationState(tableRef, mutation, 0, maxSize, connection);
                     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/97f28b1c/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 8a76564..2b35d4f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -42,6 +42,7 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.AggregatePlan;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.execute.MutationState.RowMutationState;
 import org.apache.phoenix.expression.Determinism;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
@@ -95,7 +96,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 public class UpsertCompiler {
-    private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutation) {
+    private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,RowMutationState> mutation, PhoenixStatement statement) {
         Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length);
         byte[][] pkValues = new byte[table.getPKColumns().size()][];
         // If the table uses salting, the first byte is the salting byte, set to an empty array
@@ -114,7 +115,7 @@ public class UpsertCompiler {
         }
         ImmutableBytesPtr ptr = new ImmutableBytesPtr();
         table.newKey(ptr, pkValues);
-        mutation.put(ptr, columnValues);
+        mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter()));
     }
 
     private static MutationState upsertSelect(PhoenixStatement statement, 
@@ -128,7 +129,7 @@ public class UpsertCompiler {
             boolean isAutoCommit = connection.getAutoCommit();
             byte[][] values = new byte[columnIndexes.length][];
             int rowCount = 0;
-            Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutation = Maps.newHashMapWithExpectedSize(batchSize);
+            Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize);
             PTable table = tableRef.getTable();
             ResultSet rs = new PhoenixResultSet(iterator, projector, statement);
             ImmutableBytesWritable ptr = new ImmutableBytesWritable();
@@ -156,7 +157,7 @@ public class UpsertCompiler {
                             column.getMaxLength(), column.getScale(), column.getSortOrder());
                     values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr);
                 }
-                setValues(values, pkSlotIndexes, columnIndexes, table, mutation);
+                setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement);
                 rowCount++;
                 // Commit a batch if auto commit is true and we're at our batch size
                 if (isAutoCommit && rowCount % batchSize == 0) {
@@ -802,8 +803,8 @@ public class UpsertCompiler {
                         throw new IllegalStateException();
                     }
                 }
-                Map<ImmutableBytesPtr, Map<PColumn, byte[]>> mutation = Maps.newHashMapWithExpectedSize(1);
-                setValues(values, pkSlotIndexes, columnIndexes, tableRef.getTable(), mutation);
+                Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(1);
+                setValues(values, pkSlotIndexes, columnIndexes, tableRef.getTable(), mutation, statement);
                 return new MutationState(tableRef, mutation, 0, maxSize, connection);
             }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/97f28b1c/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
index 63bf6a1..a9d8311 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CommitException.java
@@ -19,23 +19,32 @@ package org.apache.phoenix.execute;
 
 import java.sql.SQLException;
 
+import org.apache.phoenix.jdbc.PhoenixConnection;
+
 public class CommitException extends SQLException {
-    private static final long serialVersionUID = 1L;
-    private final MutationState uncommittedState;
-    private final MutationState committedState;
+    private static final long serialVersionUID = 2L;
+    private final int[] uncommittedStatementIndexes;
 
-    public CommitException(Exception e, MutationState uncommittedState, MutationState committedState) {
+    public CommitException(Exception e, int[] uncommittedStatementIndexes) {
         super(e);
-        this.uncommittedState = uncommittedState;
-        this.committedState = committedState;
-    }
-
-    public MutationState getUncommittedState() {
-        return uncommittedState;
+        this.uncommittedStatementIndexes = uncommittedStatementIndexes;
     }
 
-    public MutationState getCommittedState() {
-        return committedState;
+    /**
+     * Returns indexes of UPSERT and DELETE statements that have failed. Indexes returned
+     * correspond to each failed statement's order of creation within a {@link PhoenixConnection} up to
+     * commit/rollback.
+     * <p>
+     * Statements whose index is returned in this set correspond to one or more HBase mutations that have failed.
+     * <p>
+     * Statement indexes are maintained correctly for connections that mutate and query 
+     * <b>data</b> (DELETE, UPSERT and SELECT) only. Statement (and their subsequent failure) order
+     * is undefined for connections that execute metadata operations due to the fact that Phoenix rolls
+     * back connections after metadata mutations.
+     * 
+     * @see PhoenixConnection#getStatementExecutionCounter()
+     */
+    public int[] getUncommittedStatementIndexes() {
+    	return uncommittedStatementIndexes;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/97f28b1c/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index b98d705..9ffa135 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.execute;
 
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -59,9 +60,11 @@ import org.cloudera.htrace.TraceScope;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.sun.istack.NotNull;
 import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_BYTES;
 import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_BATCH_SIZE;
 import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_COMMIT_TIME;
@@ -79,40 +82,32 @@ public class MutationState implements SQLCloseable {
     private PhoenixConnection connection;
     private final long maxSize;
     private final ImmutableBytesPtr tempPtr = new ImmutableBytesPtr();
-    private final Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> mutations = Maps.newHashMapWithExpectedSize(3); // TODO: Sizing?
+    private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations;
     private long sizeOffset;
     private int numRows = 0;
     
-    public MutationState(int maxSize, PhoenixConnection connection) {
+    MutationState(long maxSize, PhoenixConnection connection, Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations) {
+        this.maxSize = maxSize;
+        this.connection = connection;
+        this.mutations = mutations;
+    }
+
+    public MutationState(long maxSize, PhoenixConnection connection) {
         this(maxSize,connection,0);
     }
     
-    public MutationState(int maxSize, PhoenixConnection connection, long sizeOffset) {
-        this.maxSize = maxSize;
-        this.connection = connection;
+    public MutationState(long maxSize, PhoenixConnection connection, long sizeOffset) {
+        this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize()));
         this.sizeOffset = sizeOffset;
     }
     
-    public MutationState(TableRef table, Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) {
-        this.maxSize = maxSize;
-        this.connection = connection;
+    public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) {
+        this(maxSize, connection, sizeOffset);
         this.mutations.put(table, mutations);
-        this.sizeOffset = sizeOffset;
         this.numRows = mutations.size();
         throwIfTooBig();
     }
     
-    private MutationState(List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> entries, long sizeOffset, long maxSize, PhoenixConnection connection) {
-        this.maxSize = maxSize;
-        this.connection = connection;
-        this.sizeOffset = sizeOffset;
-        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : entries) {
-            numRows += entry.getValue().size();
-            this.mutations.put(entry.getKey(), entry.getValue());
-        }
-        throwIfTooBig();
-    }
-    
     private void throwIfTooBig() {
         if (numRows > maxSize) {
             // TODO: throw SQLException ?
@@ -135,29 +130,28 @@ public class MutationState implements SQLCloseable {
         }
         this.sizeOffset += newMutation.sizeOffset;
         // Merge newMutation with this one, keeping state from newMutation for any overlaps
-        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : newMutation.mutations.entrySet()) {
+        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : newMutation.mutations.entrySet()) {
             // Replace existing entries for the table with new entries
             TableRef tableRef = entry.getKey();
             PTable table = tableRef.getTable();
             boolean isIndex = table.getType() == PTableType.INDEX;
-            Map<ImmutableBytesPtr,Map<PColumn,byte[]>> existingRows = this.mutations.put(tableRef, entry.getValue());
+            Map<ImmutableBytesPtr,RowMutationState> existingRows = this.mutations.put(tableRef, entry.getValue());
             if (existingRows != null) { // Rows for that table already exist
                 // Loop through new rows and replace existing with new
-                for (Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry : entry.getValue().entrySet()) {
+                for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : entry.getValue().entrySet()) {
                     // Replace existing row with new row
-                    Map<PColumn,byte[]> existingValues = existingRows.put(rowEntry.getKey(), rowEntry.getValue());
-                    if (existingValues != null) {
+                	RowMutationState existingRowMutationState = existingRows.put(rowEntry.getKey(), rowEntry.getValue());
+                    if (existingRowMutationState != null) {
+                    	Map<PColumn,byte[]> existingValues = existingRowMutationState.getColumnValues();
                         if (existingValues != PRow.DELETE_MARKER) {
-                            Map<PColumn,byte[]> newRow = rowEntry.getValue();
+                            Map<PColumn,byte[]> newRow = rowEntry.getValue().getColumnValues();
                             // if new row is PRow.DELETE_MARKER, it means delete, and we don't need to merge it with existing row. 
                             if (newRow != PRow.DELETE_MARKER) {
-                                // Replace existing column values with new column values
-                                for (Map.Entry<PColumn,byte[]> valueEntry : newRow.entrySet()) {
-                                    existingValues.put(valueEntry.getKey(), valueEntry.getValue());
-                                }
+                                // Merge existing column values with new column values
+                                existingRowMutationState.join(rowEntry.getValue());
                                 // Now that the existing row has been merged with the new row, replace it back
-                                // again (since it was replaced with the new one above).
-                                existingRows.put(rowEntry.getKey(), existingValues);
+                                // again (since it was merged with the new one above).
+                                existingRows.put(rowEntry.getKey(), existingRowMutationState);
                             }
                         }
                     } else {
@@ -177,20 +171,20 @@ public class MutationState implements SQLCloseable {
         throwIfTooBig();
     }
     
-    private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, Map<PColumn, byte[]>> values, long timestamp, boolean includeMutableIndexes) {
+    private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values, long timestamp, boolean includeMutableIndexes) {
         final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism
                 (tableRef.getTable().isImmutableRows() || includeMutableIndexes) ? 
                         IndexMaintainer.nonDisabledIndexIterator(tableRef.getTable().getIndexes().iterator()) : 
                         Iterators.<PTable>emptyIterator();
         final List<Mutation> mutations = Lists.newArrayListWithExpectedSize(values.size());
         final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists.<Mutation>newArrayListWithExpectedSize(values.size()) : null;
-        Iterator<Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>>> iterator = values.entrySet().iterator();
+        Iterator<Map.Entry<ImmutableBytesPtr,RowMutationState>> iterator = values.entrySet().iterator();
         while (iterator.hasNext()) {
-            Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry = iterator.next();
+            Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry = iterator.next();
             ImmutableBytesPtr key = rowEntry.getKey();
             PRow row = tableRef.getTable().newRow(connection.getKeyValueBuilder(), timestamp, key);
             List<Mutation> rowMutations, rowMutationsPertainingToIndex;
-            if (rowEntry.getValue() == PRow.DELETE_MARKER) { // means delete
+            if (rowEntry.getValue().getColumnValues() == PRow.DELETE_MARKER) { // means delete
                 row.delete();
                 rowMutations = row.toRowMutations();
                 // Row deletes for index tables are processed by running a re-written query
@@ -198,7 +192,7 @@ public class MutationState implements SQLCloseable {
                 // delete rows).
                 rowMutationsPertainingToIndex = Collections.emptyList();
             } else {
-                for (Map.Entry<PColumn,byte[]> valueEntry : rowEntry.getValue().entrySet()) {
+                for (Map.Entry<PColumn,byte[]> valueEntry : rowEntry.getValue().getColumnValues().entrySet()) {
                     row.setValue(valueEntry.getKey(), valueEntry.getValue());
                 }
                 rowMutations = row.toRowMutations();
@@ -250,14 +244,14 @@ public class MutationState implements SQLCloseable {
     }
     
     public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean includeMutableIndexes) {
-        final Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> iterator = this.mutations.entrySet().iterator();
+        final Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator();
         if (!iterator.hasNext()) {
             return Iterators.emptyIterator();
         }
         Long scn = connection.getSCN();
         final long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
         return new Iterator<Pair<byte[],List<Mutation>>>() {
-            private Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> current = iterator.next();
+            private Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> current = iterator.next();
             private Iterator<Pair<byte[],List<Mutation>>> innerIterator = init();
                     
             private Iterator<Pair<byte[],List<Mutation>>> init() {
@@ -298,7 +292,7 @@ public class MutationState implements SQLCloseable {
         Long scn = connection.getSCN();
         MetaDataClient client = new MetaDataClient(connection);
         long[] timeStamps = new long[this.mutations.size()];
-        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : mutations.entrySet()) {
+        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : mutations.entrySet()) {
             TableRef tableRef = entry.getKey();
             long serverTimeStamp = tableRef.getTimeStamp();
             PTable table = tableRef.getTable();
@@ -313,12 +307,15 @@ public class MutationState implements SQLCloseable {
                         // TODO: use bitset?
                         table = result.getTable();
                         PColumn[] columns = new PColumn[table.getColumns().size()];
-                        for (Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>> rowEntry : entry.getValue().entrySet()) {
-                            Map<PColumn,byte[]> valueEntry = rowEntry.getValue();
-                            if (valueEntry != PRow.DELETE_MARKER) {
-                                for (PColumn column : valueEntry.keySet()) {
-                                    columns[column.getPosition()] = column;
-                                }
+                        for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : entry.getValue().entrySet()) {
+                        	RowMutationState valueEntry = rowEntry.getValue();
+                            if (valueEntry != null) {
+                            	Map<PColumn, byte[]> colValues = valueEntry.getColumnValues();
+                            	if (colValues != PRow.DELETE_MARKER) {
+	                                for (PColumn column : colValues.keySet()) {
+	                                    columns[column.getPosition()] = column;
+	                                }
+                            	}
                             }
                         }
                         for (PColumn column : columns) {
@@ -354,15 +351,14 @@ public class MutationState implements SQLCloseable {
         int i = 0;
         byte[] tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getBytes();
         long[] serverTimeStamps = validate();
-        Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> iterator = this.mutations.entrySet().iterator();
-        List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> committedList = Lists.newArrayListWithCapacity(this.mutations.size());
+        Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator();
 
         // add tracing for this operation
         TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables");
         Span span = trace.getSpan();
         while (iterator.hasNext()) {
-            Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry = iterator.next();
-            Map<ImmutableBytesPtr,Map<PColumn,byte[]>> valuesMap = entry.getValue();
+            Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry = iterator.next();
+            Map<ImmutableBytesPtr,RowMutationState> valuesMap = entry.getValue();
             TableRef tableRef = entry.getKey();
             PTable table = tableRef.getTable();
             table.getIndexMaintainers(tempPtr, connection);
@@ -425,7 +421,6 @@ public class MutationState implements SQLCloseable {
                         MUTATION_COMMIT_TIME.update(duration);
                         shouldRetry = false;
                         if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Total time for batch call of  " + mutations.size() + " mutations into " + table.getName().getString() + ": " + duration + " ms", connection));
-                        committedList.add(entry);
                     } catch (Exception e) {
                         SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e);
                         if (inferredE != null) {
@@ -446,9 +441,7 @@ public class MutationState implements SQLCloseable {
                             }
                             e = inferredE;
                         }
-                        // Throw to client with both what was committed so far and what is left to be committed.
-                        // That way, client can either undo what was done or try again with what was not done.
-                        sqlE = new CommitException(e, this, new MutationState(committedList, this.sizeOffset, this.maxSize, this.connection));
+                        sqlE = new CommitException(e, getUncommittedSattementIndexes());
                     } finally {
                         try {
                             hTable.close();
@@ -488,7 +481,64 @@ public class MutationState implements SQLCloseable {
         numRows = 0;
     }
     
+    private int[] getUncommittedSattementIndexes() {
+    	int[] result = new int[0];
+    	for (Map<ImmutableBytesPtr, RowMutationState> rowMutations : mutations.values()) {
+    		for (RowMutationState rowMutationState : rowMutations.values()) {
+    			result = joinSortedIntArrays(result, rowMutationState.getStatementIndexes());
+    		}
+    	}
+    	return result;
+    }
+    
     @Override
     public void close() throws SQLException {
     }
+    
+    public static int[] joinSortedIntArrays(int[] a, int[] b) {
+        int[] result = new int[a.length + b.length];
+        int i = 0, j = 0, k = 0, current;
+        while (i < a.length && j < b.length) {
+            current = a[i] < b[j] ? a[i++] : b[j++];
+            for ( ; i < a.length && a[i] == current; i++);
+            for ( ; j < b.length && b[j] == current; j++);
+            result[k++] = current;
+        }
+        while (i < a.length) {
+            for (current = a[i++] ; i < a.length && a[i] == current; i++);
+            result[k++] = current;
+        }
+        while (j < b.length) {
+            for (current = b[j++] ; j < b.length && b[j] == current; j++);
+            result[k++] = current;
+        }
+        return Arrays.copyOf(result, k);
+    }
+    
+    public static class RowMutationState {
+        private Map<PColumn,byte[]> columnValues;
+        private int[] statementIndexes;
+
+        public RowMutationState(@NotNull Map<PColumn,byte[]> columnValues, int statementIndex) {
+            Preconditions.checkNotNull(columnValues);
+
+            this.columnValues = columnValues;
+            this.statementIndexes = new int[] {statementIndex};
+        }
+
+        Map<PColumn, byte[]> getColumnValues() {
+            return columnValues;
+        }
+
+        int[] getStatementIndexes() {
+            return statementIndexes;
+        }
+        
+        void join(RowMutationState newRow) {
+            getColumnValues().putAll(newRow.getColumnValues());
+            statementIndexes = joinSortedIntArrays(statementIndexes, newRow.getStatementIndexes());
+        }
+        
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/97f28b1c/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index d0a3c63..663a4a8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.phoenix.call.CallRunner;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.CommitException;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.expression.function.FunctionArgumentType;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
@@ -122,17 +123,17 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
     private final Properties info;
     private List<SQLCloseable> statements = new ArrayList<SQLCloseable>();
     private final Map<PDataType<?>, Format> formatters = new HashMap<>();
-    private final MutationState mutationState;
+    private MutationState mutationState;
     private final int mutateBatchSize;
     private final Long scn;
     private boolean isAutoCommit = false;
     private PMetaData metaData;
     private final PName tenantId;
-    private final String datePattern;
+    private final String datePattern; 
     private final String timePattern;
     private final String timestampPattern;
+    private int statementExecutionCounter;
     private TraceScope traceScope = null;
-    
     private boolean isClosed = false;
     private Sampler<?> sampler;
     private boolean readOnly = false;
@@ -152,17 +153,20 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
         this(connection.getQueryServices(), connection.getURL(), connection.getClientInfo(), connection.getMetaDataCache());
         this.isAutoCommit = connection.isAutoCommit;
         this.sampler = connection.sampler;
+        this.statementExecutionCounter = connection.statementExecutionCounter;
     }
     
     public PhoenixConnection(PhoenixConnection connection, long scn) throws SQLException {
         this(connection.getQueryServices(), connection, scn);
         this.sampler = connection.sampler;
+        this.statementExecutionCounter = connection.statementExecutionCounter;
     }
     
     public PhoenixConnection(ConnectionQueryServices services, PhoenixConnection connection, long scn) throws SQLException {
         this(services, connection.getURL(), newPropsWithSCN(scn,connection.getClientInfo()), connection.getMetaDataCache());
         this.isAutoCommit = connection.isAutoCommit;
         this.sampler = connection.sampler;
+        this.statementExecutionCounter = connection.statementExecutionCounter;
     }
     
     public PhoenixConnection(ConnectionQueryServices services, String url, Properties info, PMetaData metaData) throws SQLException {
@@ -235,7 +239,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
             }
             
         });
-        this.mutationState = new MutationState(maxSize, this);
+        this.mutationState = newMutationState(maxSize);
         this.services.addConnection(this);
 
         // setup tracing, if its enabled
@@ -367,6 +371,10 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
         return metaData;
     }
 
+    protected MutationState newMutationState(int maxSize) {
+        return new MutationState(maxSize, this); 
+    }
+    
     public MutationState getMutationState() {
         return mutationState;
     }
@@ -435,6 +443,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
                 return null;
             }
         }, Tracing.withTracing(this, "committing mutations"));
+        statementExecutionCounter = 0;
     }
 
     @Override
@@ -635,6 +644,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
     @Override
     public void rollback() throws SQLException {
         mutationState.rollback(this);
+        statementExecutionCounter = 0;
     }
 
     @Override
@@ -788,6 +798,21 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd
     public KeyValueBuilder getKeyValueBuilder() {
         return this.services.getKeyValueBuilder();
     }
+    
+    /**
+     * Used to track executions of {@link Statement}s and {@link PreparedStatement}s that were created from this connection before
+     * commit or rollback. 0-based. Used to associate partial save errors with SQL statements
+     * invoked by users.
+     * @see CommitException
+     * @see #incrementStatementExecutionCounter()
+     */
+    public int getStatementExecutionCounter() {
+		return statementExecutionCounter;
+	}
+    
+    public void incrementStatementExecutionCounter() {
+        statementExecutionCounter++;
+    }
 
     public TraceScope getTraceScope() {
         return traceScope;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/97f28b1c/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
index 6a65f1d..13de06f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
@@ -54,8 +54,8 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.schema.ExecuteQueryNotApplicableException;
 import org.apache.phoenix.schema.ExecuteUpdateNotApplicableException;
-import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.Sequence;
+import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.SQLCloseable;
 
@@ -79,8 +79,7 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar
 
     private final String query;
 
-    public PhoenixPreparedStatement(PhoenixConnection connection, PhoenixStatementParser parser) throws SQLException,
-            IOException {
+    public PhoenixPreparedStatement(PhoenixConnection connection, PhoenixStatementParser parser) throws SQLException, IOException {
         super(connection);
         this.statement = parser.nextStatement(new ExecutableNodeFactory());
         if (this.statement == null) { throw new EOFException(); }
@@ -89,7 +88,7 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar
         this.parameters = Arrays.asList(new Object[statement.getBindCount()]);
         Collections.fill(parameters, BindManager.UNBOUND_PARAMETER);
     }
-
+    
     public PhoenixPreparedStatement(PhoenixConnection connection, String query) throws SQLException {
         super(connection);
         this.query = query;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/97f28b1c/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 21b641b..ca110fc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -156,6 +156,7 @@ import com.google.common.collect.Lists;
  * @since 0.1
  */
 public class PhoenixStatement implements Statement, SQLCloseable, org.apache.phoenix.jdbc.Jdbc7Shim.Statement {
+	
     private static final Logger logger = LoggerFactory.getLogger(PhoenixStatement.class);
     
     public enum Operation {
@@ -250,6 +251,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
                         setLastResultSet(rs);
                         setLastUpdateCount(NO_UPDATE);
                         setLastUpdateOperation(stmt.getOperation());
+                        connection.incrementStatementExecutionCounter();
                         return rs;
                     } catch (RuntimeException e) {
                         // FIXME: Expression.evaluate does not throw SQLException
@@ -302,6 +304,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
                                 int lastUpdateCount = (int) Math.min(Integer.MAX_VALUE, state.getUpdateCount());
                                 setLastUpdateCount(lastUpdateCount);
                                 setLastUpdateOperation(stmt.getOperation());
+                                connection.incrementStatementExecutionCounter();
                                 return lastUpdateCount;
                             } catch (RuntimeException e) {
                                 // FIXME: Expression.evaluate does not throw SQLException

http://git-wip-us.apache.org/repos/asf/phoenix/blob/97f28b1c/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
new file mode 100644
index 0000000..67c3353
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/execute/MutationStateTest.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you maynot use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicablelaw or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import static org.apache.phoenix.execute.MutationState.joinSortedIntArrays;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+public class MutationStateTest {
+
+    @Test
+    public void testJoinIntArrays() {
+        // simple case
+        int[] a = new int[] {1};
+        int[] b = new int[] {2};
+        int[] result = joinSortedIntArrays(a, b);
+        
+        assertEquals(2, result.length);
+        assertArrayEquals(new int[] {1,2}, result);
+        
+        // empty arrays
+        a = new int[0];
+        b = new int[0];
+        result = joinSortedIntArrays(a, b);
+        
+        assertEquals(0, result.length);
+        assertArrayEquals(new int[] {}, result);
+        
+        // dupes between arrays
+        a = new int[] {1,2,3};
+        b = new int[] {1,2,4};
+        result = joinSortedIntArrays(a, b);
+        
+        assertEquals(4, result.length);
+        assertArrayEquals(new int[] {1,2,3,4}, result);
+        
+        // dupes within arrays
+        a = new int[] {1,2,2,3};
+        b = new int[] {1,2,4};
+        result = joinSortedIntArrays(a, b);
+        
+        assertEquals(4, result.length);
+        assertArrayEquals(new int[] {1,2,3,4}, result);
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/97f28b1c/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 028cb18..cd337d9 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -667,7 +667,7 @@ public abstract class BaseTest {
      * Create a {@link PhoenixTestDriver} and register it.
      * @return an initialized and registered {@link PhoenixTestDriver} 
      */
-    protected static PhoenixTestDriver initAndRegisterDriver(String url, ReadOnlyProps props) throws Exception {
+    public static PhoenixTestDriver initAndRegisterDriver(String url, ReadOnlyProps props) throws Exception {
         PhoenixTestDriver newDriver = new PhoenixTestDriver(props);
         DriverManager.registerDriver(newDriver);
         Driver oldDriver = DriverManager.getDriver(url);