You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/12/11 03:43:46 UTC

[08/52] [abbrv] phoenix git commit: PHOENIX-1674 Snapshot isolation transaction support through Tephra (James Taylor, Thomas D'Silva)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
index b269212..8f5301c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
@@ -52,17 +52,28 @@ public class MutableIndexIT extends BaseHBaseManagedTimeIT {
     
     protected final boolean localIndex;
     private final String tableDDLOptions;
+    private final String tableName;
+    private final String indexName;
+    private final String fullTableName;
+    private final String fullIndexName;
 	
-    public MutableIndexIT(boolean localIndex) {
+    public MutableIndexIT(boolean localIndex, boolean transactional) {
 		this.localIndex = localIndex;
 		StringBuilder optionBuilder = new StringBuilder();
+		if (transactional) {
+			optionBuilder.append("TRANSACTIONAL=true");
+		}
 		this.tableDDLOptions = optionBuilder.toString();
+		this.tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + ( transactional ?  "_TXN" : "");
+        this.indexName = "IDX" + ( transactional ?  "_TXN" : "");
+        this.fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+        this.fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
 	}
 	
-	@Parameters(name="localIndex = {0}")
+	@Parameters(name="localIndex = {0} , transactional = {1}")
     public static Collection<Boolean[]> data() {
         return Arrays.asList(new Boolean[][] {     
-                 { false }, {true}
+                 { false, false }, { false, true }, { true, false }, { true, true }
            });
     }
     
@@ -71,12 +82,6 @@ public class MutableIndexIT extends BaseHBaseManagedTimeIT {
     	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
 	        conn.setAutoCommit(false);
-	        // create unique table and index names for each parameterized test
-	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
-	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
-	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
-	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
-	        
             createMultiCFTestTable(fullTableName, tableDDLOptions);
             populateMultiCFTestTable(fullTableName);
             PreparedStatement stmt = conn.prepareStatement("CREATE " + (localIndex ? " LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName 
@@ -174,11 +179,6 @@ public class MutableIndexIT extends BaseHBaseManagedTimeIT {
 	        conn.setAutoCommit(false);
 	        String query;
 	        ResultSet rs;
-	        // create unique table and index names for each parameterized test
-	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
-	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
-	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
-	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
 	        conn.createStatement().execute("CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions);
 	        query = "SELECT * FROM " + fullTableName;
 	        rs = conn.createStatement().executeQuery(query);
@@ -284,12 +284,6 @@ public class MutableIndexIT extends BaseHBaseManagedTimeIT {
 	        conn.setAutoCommit(false);
 	        String query;
 	        ResultSet rs;
-	        // create unique table and index names for each parameterized test
-	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
-	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
-	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
-	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
-	
 	        // make sure that the tables are empty, but reachable
 	        conn.createStatement().execute("CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions);
 	        query = "SELECT * FROM " + fullTableName;
@@ -405,12 +399,6 @@ public class MutableIndexIT extends BaseHBaseManagedTimeIT {
 	        conn.setAutoCommit(false);
 	        String query;
 	        ResultSet rs;
-	        // create unique table and index names for each parameterized test
-	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
-	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
-	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
-	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
-    
 	        // make sure that the tables are empty, but reachable
 	        conn.createStatement().execute(
 	          "CREATE TABLE " + fullTableName
@@ -487,24 +475,20 @@ public class MutableIndexIT extends BaseHBaseManagedTimeIT {
     @Test
     public void testUpsertingNullForIndexedColumns() throws Exception {
     	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String testTableName = tableName + "_" + System.currentTimeMillis();
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
 	        conn.setAutoCommit(false);
 	        ResultSet rs;
-	        // create unique table and index names for each parameterized test
-	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
-	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
-	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
-	        String fullIndexeName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
     		Statement stmt = conn.createStatement();
-    		stmt.execute("CREATE TABLE " + fullTableName + "(v1 VARCHAR PRIMARY KEY, v2 DOUBLE, v3 VARCHAR) "+tableDDLOptions);
-    		stmt.execute("CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName + " ON " + fullTableName + "  (v2) INCLUDE(v3)");
+    		stmt.execute("CREATE TABLE " + testTableName + "(v1 VARCHAR PRIMARY KEY, v2 DOUBLE, v3 VARCHAR) "+tableDDLOptions);
+    		stmt.execute("CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName + " ON " + testTableName + "  (v2) INCLUDE(v3)");
     		
     		//create a row with value null for indexed column v2
-    		stmt.executeUpdate("upsert into " + fullTableName + " values('cc1', null, 'abc')");
+    		stmt.executeUpdate("upsert into " + testTableName + " values('cc1', null, 'abc')");
     		conn.commit();
     		
     		//assert values in index table 
-    		rs = stmt.executeQuery("select * from " + fullIndexeName);
+    		rs = stmt.executeQuery("select * from " + fullIndexName);
     		assertTrue(rs.next());
     		assertEquals(0, Doubles.compare(0, rs.getDouble(1)));
     		assertTrue(rs.wasNull());
@@ -513,7 +497,7 @@ public class MutableIndexIT extends BaseHBaseManagedTimeIT {
     		assertFalse(rs.next());
     		
     		//assert values in data table
-    		rs = stmt.executeQuery("select v1, v2, v3 from " + fullTableName);
+    		rs = stmt.executeQuery("select v1, v2, v3 from " + testTableName);
     		assertTrue(rs.next());
     		assertEquals("cc1", rs.getString(1));
     		assertEquals(0, Doubles.compare(0, rs.getDouble(2)));
@@ -522,11 +506,11 @@ public class MutableIndexIT extends BaseHBaseManagedTimeIT {
     		assertFalse(rs.next());
     		
     		//update the previously null value for indexed column v2 to a non-null value 1.23
-    		stmt.executeUpdate("upsert into " + fullTableName + " values('cc1', 1.23, 'abc')");
+    		stmt.executeUpdate("upsert into " + testTableName + " values('cc1', 1.23, 'abc')");
     		conn.commit();
     		
     		//assert values in data table
-    		rs = stmt.executeQuery("select /*+ NO_INDEX */ v1, v2, v3 from " + fullTableName);
+    		rs = stmt.executeQuery("select /*+ NO_INDEX */ v1, v2, v3 from " + testTableName);
     		assertTrue(rs.next());
     		assertEquals("cc1", rs.getString(1));
     		assertEquals(0, Doubles.compare(1.23, rs.getDouble(2)));
@@ -534,7 +518,7 @@ public class MutableIndexIT extends BaseHBaseManagedTimeIT {
     		assertFalse(rs.next());
     		
     		//assert values in index table 
-    		rs = stmt.executeQuery("select * from " + fullIndexeName);
+    		rs = stmt.executeQuery("select * from " + indexName);
     		assertTrue(rs.next());
     		assertEquals(0, Doubles.compare(1.23, rs.getDouble(1)));
     		assertEquals("cc1", rs.getString(2));
@@ -542,11 +526,11 @@ public class MutableIndexIT extends BaseHBaseManagedTimeIT {
     		assertFalse(rs.next());
     		
     		//update the value for indexed column v2 back to null
-    		stmt.executeUpdate("upsert into " + fullTableName + " values('cc1', null, 'abc')");
+    		stmt.executeUpdate("upsert into " + testTableName + " values('cc1', null, 'abc')");
     		conn.commit();
     		
     		//assert values in index table 
-    		rs = stmt.executeQuery("select * from " + fullIndexeName);
+    		rs = stmt.executeQuery("select * from " + indexName);
     		assertTrue(rs.next());
     		assertEquals(0, Doubles.compare(0, rs.getDouble(1)));
     		assertTrue(rs.wasNull());
@@ -555,7 +539,7 @@ public class MutableIndexIT extends BaseHBaseManagedTimeIT {
     		assertFalse(rs.next());
     		
     		//assert values in data table
-    		rs = stmt.executeQuery("select v1, v2, v3 from " + fullTableName);
+    		rs = stmt.executeQuery("select v1, v2, v3 from " + testTableName);
     		assertTrue(rs.next());
     		assertEquals("cc1", rs.getString(1));
     		assertEquals(0, Doubles.compare(0, rs.getDouble(2)));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/MutableRollbackIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/MutableRollbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/MutableRollbackIT.java
new file mode 100644
index 0000000..58f6226
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/MutableRollbackIT.java
@@ -0,0 +1,510 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index.txn;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.end2end.Shadower;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Maps;
+
+@RunWith(Parameterized.class)
+public class MutableRollbackIT extends BaseHBaseManagedTimeIT {
+	
+	private final boolean localIndex;
+	private String tableName1;
+    private String indexName1;
+    private String fullTableName1;
+    private String tableName2;
+    private String indexName2;
+    private String fullTableName2;
+
+	public MutableRollbackIT(boolean localIndex) {
+		this.localIndex = localIndex;
+		this.tableName1 = TestUtil.DEFAULT_DATA_TABLE_NAME + "_1_";
+        this.indexName1 = "IDX1";
+        this.fullTableName1 = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName1);
+        this.tableName2 = TestUtil.DEFAULT_DATA_TABLE_NAME + "_2_";
+        this.indexName2 = "IDX2";
+        this.fullTableName2 = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName2);
+	}
+	
+	@BeforeClass
+    @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(2);
+        props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+	
+	@Parameters(name="localIndex = {0}")
+    public static Collection<Boolean> data() {
+        return Arrays.asList(new Boolean[] {     
+                 false, true  
+           });
+    }
+	
+    public void testRollbackOfUncommittedExistingKeyValueIndexUpdate() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        try {
+            Statement stmt = conn.createStatement();
+            stmt.execute("CREATE TABLE " + fullTableName1 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+            stmt.execute("CREATE TABLE " + fullTableName2 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) IMMUTABLE_ROWS=true");
+            stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX " + indexName1 + " ON " + fullTableName1 + " (v1) INCLUDE(v2)");
+            stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX " + indexName2 + " ON " + fullTableName2 + " (v1) INCLUDE(v2)");
+            
+            stmt.executeUpdate("upsert into " + fullTableName1 + " values('x', 'y', 'a')");
+            conn.commit();
+            
+            //assert rows exists in fullTableName1
+            ResultSet rs = stmt.executeQuery("select /*+ NO_INDEX */ k, v1, v2 from " + fullTableName1);
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert rows exists in indexName1
+            rs = stmt.executeQuery("select /*+ INDEX(" + indexName1 + ")*/ k, v1, v2 from " + fullTableName1);
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert no rows exists in fullTableName2
+            rs = stmt.executeQuery("select /*+ NO_INDEX */ k, v1, v2 from " + fullTableName2);
+            assertFalse(rs.next());
+            
+            //assert no rows exists in indexName2
+            rs = stmt.executeQuery("select /*+ INDEX(" + indexName2 + ")*/ k, v1 from " + fullTableName2);
+            assertFalse(rs.next());
+            
+            stmt.executeUpdate("upsert into " + fullTableName1 + " values('x', 'y', 'b')");
+            stmt.executeUpdate("upsert into " + fullTableName2 + " values('a', 'b', 'c')");
+            
+            //assert new covered column value 
+            rs = stmt.executeQuery("select /*+ NO_INDEX */ k, v1, v2 from " + fullTableName1);
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("b", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert new covered column value 
+            rs = stmt.executeQuery("select /*+ INDEX(" + indexName1 + ")*/ k, v1, v2 from " + fullTableName1);
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("b", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert rows exists in fullTableName2
+            rs = stmt.executeQuery("select /*+ NO_INDEX */ k, v1, v2 from " + fullTableName2);
+            assertTrue(rs.next());
+            assertEquals("a", rs.getString(1));
+            assertEquals("b", rs.getString(2));
+            assertEquals("c", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert rows exists in " + fullTableName2 + " index table
+            rs = stmt.executeQuery("select /*+ INDEX(" + indexName2 + ")*/ k, v1 from " + fullTableName2);
+            assertTrue(rs.next());
+            assertEquals("a", rs.getString(1));
+            assertEquals("b", rs.getString(2));
+            assertFalse(rs.next());
+            
+            conn.rollback();
+            
+            //assert original row exists in fullTableName1
+            rs = stmt.executeQuery("select /*+ NO_INDEX */ k, v1, v2 from " + fullTableName1);
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert original row exists in indexName1
+            rs = stmt.executeQuery("select /*+ INDEX(" + indexName1 + ")*/ k, v1, v2 from " + fullTableName1);
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert no rows exists in fullTableName2
+            rs = stmt.executeQuery("select /*+ NO_INDEX */ k, v1, v2 from " + fullTableName2);
+            assertFalse(rs.next());
+            
+            //assert no rows exists in indexName2
+            rs = stmt.executeQuery("select /*+ INDEX(" + indexName2 + ")*/ k, v1 from " + fullTableName2);
+            assertFalse(rs.next());
+            
+            stmt.executeUpdate("upsert into " + fullTableName1 + " values('x', 'z', 'a')");
+            stmt.executeUpdate("upsert into " + fullTableName2 + " values('a', 'b', 'c')");
+            conn.commit();
+
+            assertDataAndIndexRows(stmt);
+            stmt.executeUpdate("delete from " + fullTableName1 + " where  k='x'");
+            stmt.executeUpdate("delete from " + fullTableName2 + " where  v1='b'");
+            
+            //assert no rows exists in fullTableName1
+            rs = stmt.executeQuery("select /*+ NO_INDEX */ k, v1, v2 from " + fullTableName1);
+            assertFalse(rs.next());
+            //assert no rows exists in indexName1
+            rs = stmt.executeQuery("select /*+ INDEX(" + indexName1 + ")*/ k, v1 from " + fullTableName1 + " ORDER BY v1");
+            assertFalse(rs.next());
+
+            //assert no rows exists in fullTableName2
+            rs = stmt.executeQuery("select /*+ NO_INDEX */ k, v1, v2 from " + fullTableName2);
+            assertFalse(rs.next());
+            //assert no rows exists in indexName2
+            rs = stmt.executeQuery("select /*+ INDEX(" + indexName2 + ")*/ k, v1 from " + fullTableName2);
+            assertFalse(rs.next());
+            
+            conn.rollback();
+            assertDataAndIndexRows(stmt);
+        } finally {
+            conn.close();
+        }
+    }
+
+	@Test
+    public void testRollbackOfUncommittedExistingRowKeyIndexUpdate() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        try {
+            Statement stmt = conn.createStatement();
+            stmt.execute("CREATE TABLE " + fullTableName1 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+            stmt.execute("CREATE TABLE " + fullTableName2 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) IMMUTABLE_ROWS=true");
+            stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX " + indexName1 + " ON " + fullTableName1 + " (v1, k)");
+            stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX " + indexName2 + " ON " + fullTableName2 + " (v1, k)");
+            
+            stmt.executeUpdate("upsert into " + fullTableName1 + " values('x', 'y', 'a')");
+            conn.commit();
+            
+            //assert rows exists in " + fullTableName1 + " 
+            ResultSet rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName1);
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert rows exists in indexName1
+            rs = stmt.executeQuery("select /*+ INDEX(" + indexName1 + ")*/ k, v1 from " + fullTableName1);
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertFalse(rs.next());
+            
+            //assert no rows exists in fullTableName2
+            rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName2);
+            assertFalse(rs.next());
+            
+            //assert no rows exists in indexName2
+            rs = stmt.executeQuery("select /*+ INDEX(" + indexName2 + ")*/ k, v1 from " + fullTableName2);
+            assertFalse(rs.next());
+            
+            stmt.executeUpdate("upsert into " + fullTableName1 + " values('x', 'z', 'a')");
+            stmt.executeUpdate("upsert into " + fullTableName2 + " values('a', 'b', 'c')");
+            
+            assertDataAndIndexRows(stmt);
+            
+            conn.rollback();
+            
+            //assert original row exists in fullTableName1
+            rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName1);
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert original row exists in indexName1
+            rs = stmt.executeQuery("select /*+ INDEX(" + indexName1 + ")*/ k, v1, v2 from " + fullTableName1);
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert no rows exists in fullTableName2
+            rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName2);
+            assertFalse(rs.next());
+            
+            //assert no rows exists in indexName2
+            rs = stmt.executeQuery("select /*+ INDEX(" + indexName1 + ")*/ k, v1 from " + fullTableName2);
+            assertFalse(rs.next());
+            
+            stmt.executeUpdate("upsert into " + fullTableName1 + " values('x', 'z', 'a')");
+            stmt.executeUpdate("upsert into " + fullTableName2 + " values('a', 'b', 'c')");
+            conn.commit();
+
+            assertDataAndIndexRows(stmt);
+            stmt.executeUpdate("delete from " + fullTableName1 + " where  k='x'");
+            stmt.executeUpdate("delete from " + fullTableName2 + " where  v1='b'");
+            
+            //assert no rows exists in fullTableName1
+            rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName1);
+            assertFalse(rs.next());
+            //assert no rows exists in indexName1
+            rs = stmt.executeQuery("select /*+ INDEX(" + indexName1 + ")*/ k, v1 from " + fullTableName1);
+            assertFalse(rs.next());
+
+            //assert no rows exists in fullTableName2
+            rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName2);
+            assertFalse(rs.next());
+            //assert no rows exists in indexName2
+            rs = stmt.executeQuery("select /*+ INDEX(" + indexName2 + ")*/ k, v1 from " + fullTableName2);
+            assertFalse(rs.next());
+            
+            conn.rollback();
+            assertDataAndIndexRows(stmt);
+
+        } finally {
+            conn.close();
+        }
+    }
+	
+    private void assertDataAndIndexRows(Statement stmt) throws SQLException, IOException {
+        ResultSet rs;
+        //assert new covered row key value exists in fullTableName1
+        rs = stmt.executeQuery("select /*+ NO_INDEX */ k, v1, v2 from " + fullTableName1);
+        assertTrue(rs.next());
+        assertEquals("x", rs.getString(1));
+        assertEquals("z", rs.getString(2));
+        assertEquals("a", rs.getString(3));
+        assertFalse(rs.next());
+        
+        //assert new covered row key value exists in indexName1
+        rs = stmt.executeQuery("select /*+ INDEX(" + indexName1 + ")*/ k, v1, v2 from " + fullTableName1);
+        assertTrue(rs.next());
+        assertEquals("x", rs.getString(1));
+        assertEquals("z", rs.getString(2));
+        assertEquals("a", rs.getString(3));
+        assertFalse(rs.next());
+        
+        //assert rows exists in fullTableName2
+        rs = stmt.executeQuery("select /*+ NO_INDEX */ k, v1, v2 from " + fullTableName2);
+        assertTrue(rs.next());
+        assertEquals("a", rs.getString(1));
+        assertEquals("b", rs.getString(2));
+        assertEquals("c", rs.getString(3));
+        assertFalse(rs.next());
+        
+        //assert rows exists in " + fullTableName2 + " index table
+        rs = stmt.executeQuery("select /*+ INDEX(" + indexName1 + ")*/ k, v1 from " + fullTableName2);
+        assertTrue(rs.next());
+        assertEquals("a", rs.getString(1));
+        assertEquals("b", rs.getString(2));
+        assertFalse(rs.next());
+    }
+    
+    @Test
+    public void testMultiRollbackOfUncommittedExistingRowKeyIndexUpdate() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        try {
+            Statement stmt = conn.createStatement();
+            stmt.execute("CREATE TABLE " + fullTableName1 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+            stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX " + indexName1 + " ON " + fullTableName1 + " (v1, k)");
+            
+            stmt.executeUpdate("upsert into " + fullTableName1 + " values('x', 'yyyy', 'a')");
+            conn.commit();
+            
+            //assert rows exists in " + fullTableName1 + " 
+            ResultSet rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName1);
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("yyyy", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert rows exists in indexName1
+            rs = stmt.executeQuery("select /*+ INDEX(" + indexName1 + ")*/ k, v1 from " + fullTableName1 + " ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("yyyy", rs.getString(2));
+            assertFalse(rs.next());
+            
+            stmt.executeUpdate("upsert into " + fullTableName1 + " values('x', 'zzz', 'a')");
+            
+            //assert new covered row key value exists in fullTableName1
+            rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName1);
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("zzz", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert new covered row key value exists in indexName1
+            rs = stmt.executeQuery("select /*+ INDEX(" + indexName1 + ")*/ k, v1 from " + fullTableName1 + " ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("zzz", rs.getString(2));
+            assertFalse(rs.next());
+            
+            conn.rollback();
+            
+            //assert original row exists in fullTableName1
+            rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName1);
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("yyyy", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert original row exists in indexName1
+            rs = stmt.executeQuery("select /*+ INDEX(" + indexName1 + ")*/ k, v1 from " + fullTableName1 + " ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("yyyy", rs.getString(2));
+            assertFalse(rs.next());
+            
+            stmt.executeUpdate("upsert into " + fullTableName1 + " values('x', 'zz', 'a')");
+            
+            //assert new covered row key value exists in fullTableName1
+            rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName1);
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("zz", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert new covered row key value exists in indexName1
+            rs = stmt.executeQuery("select /*+ INDEX(" + indexName1 + ")*/ k, v1 from " + fullTableName1 + " ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("zz", rs.getString(2));
+            assertFalse(rs.next());
+            
+            conn.rollback();
+            
+            //assert original row exists in fullTableName1
+            rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName1);
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("yyyy", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert original row exists in indexName1
+            rs = stmt.executeQuery("select /*+ INDEX(" + indexName1 + ")*/ k, v1 from " + fullTableName1 + " ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("yyyy", rs.getString(2));
+            assertFalse(rs.next());
+                        
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testCheckpointAndRollback() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        try {
+            Statement stmt = conn.createStatement();
+            stmt.execute("CREATE TABLE " + fullTableName1 + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+            stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX " + indexName1 + " ON " + fullTableName1 + " (v1)");
+            stmt.executeUpdate("upsert into " + fullTableName1 + " values('x', 'a', 'a')");
+            conn.commit();
+            
+            stmt.executeUpdate("upsert into " + fullTableName1 + "(k,v1) SELECT k,v1||'a' FROM " + fullTableName1);
+            ResultSet rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName1);
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("aa", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            rs = stmt.executeQuery("select /*+ INDEX(" + indexName1 + ")*/ k, v1 from " + fullTableName1 + " ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("aa", rs.getString(2));
+            assertFalse(rs.next());
+            
+            stmt.executeUpdate("upsert into " + fullTableName1 + "(k,v1) SELECT k,v1||'a' FROM " + fullTableName1);
+            
+            rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName1);
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("aaa", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            rs = stmt.executeQuery("select /*+ INDEX(" + indexName1 + ")*/ k, v1 from " + fullTableName1 + " ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("aaa", rs.getString(2));
+            assertFalse(rs.next());
+            
+            conn.rollback();
+            
+            //assert original row exists in fullTableName1
+            rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName1);
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("a", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert original row exists in indexName1
+            rs = stmt.executeQuery("select /*+ INDEX(" + indexName1 + ")*/ k, v1 from " + fullTableName1 + " ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("a", rs.getString(2));
+            assertFalse(rs.next());
+
+        } finally {
+            conn.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/RollbackIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/RollbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/RollbackIT.java
new file mode 100644
index 0000000..dbddcb1
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/RollbackIT.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index.txn;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.end2end.Shadower;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Maps;
+
+@RunWith(Parameterized.class)
+public class RollbackIT extends BaseHBaseManagedTimeIT {
+	
+	private final boolean localIndex;
+	private final boolean mutable;
+	private final String tableName;
+    private final String indexName;
+    private final String fullTableName;
+
+	public RollbackIT(boolean localIndex, boolean mutable) {
+		this.localIndex = localIndex;
+		this.mutable = mutable;
+		this.tableName = TestUtil.DEFAULT_DATA_TABLE_NAME;
+		this.indexName = "IDX";
+		this.fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+	}
+	
+	@BeforeClass
+    @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(2);
+        props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+	
+	@Parameters(name="localIndex = {0} , mutable = {1}")
+    public static Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] {     
+                 { false, false }, { false, true },
+                 { true, false }, { true, true }  
+           });
+    }
+    
+    @Test
+    public void testRollbackOfUncommittedKeyValueIndexInsert() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        try {
+            Statement stmt = conn.createStatement();
+            stmt.execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"+(!mutable? " IMMUTABLE_ROWS=true" : ""));
+            stmt.execute("CREATE "+(localIndex? "LOCAL " : "")+"INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE(v2)");
+            
+            stmt.executeUpdate("upsert into " + fullTableName + " values('x', 'y', 'a')");
+            
+            //assert values in data table
+            ResultSet rs = stmt.executeQuery("select /*+ NO_INDEX */ k, v1, v2 from " + fullTableName);
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert values in index table
+            rs = stmt.executeQuery("select /*+ INDEX(" + indexName + ")*/ k, v1, v2  from " + fullTableName);
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            conn.rollback();
+            
+            //assert values in data table
+            rs = stmt.executeQuery("select /*+ NO_INDEX */ k, v1, v2 from " + fullTableName);
+            assertFalse(rs.next());
+            
+            //assert values in index table
+            rs = stmt.executeQuery("select /*+ INDEX(" + indexName + ")*/ k, v1, v2 from " + fullTableName);
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testRollbackOfUncommittedRowKeyIndexInsert() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        try {
+            Statement stmt = conn.createStatement();
+            stmt.execute("CREATE TABLE " + fullTableName + "(k VARCHAR, v1 VARCHAR, v2 VARCHAR, CONSTRAINT pk PRIMARY KEY (v1, v2))"+(!mutable? " IMMUTABLE_ROWS=true" : ""));
+            stmt.execute("CREATE "+(localIndex? "LOCAL " : "")+"INDEX " + indexName + " ON " + fullTableName + "(v1, k)");
+            
+            stmt.executeUpdate("upsert into " + fullTableName + " values('x', 'y', 'a')");
+
+            ResultSet rs = stmt.executeQuery("select /*+ NO_INDEX */ k, v1, v2 from " + fullTableName);
+            
+            //assert values in data table
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert values in index table
+            rs = stmt.executeQuery("select /*+ INDEX(" + indexName + ")*/ k, v1 from " + fullTableName);
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertFalse(rs.next());
+            
+            conn.rollback();
+            
+            //assert values in data table
+            rs = stmt.executeQuery("select /*+ NO_INDEX */ k, v1, v2 from " + fullTableName);
+            assertFalse(rs.next());
+            
+            //assert values in index table
+            rs = stmt.executeQuery("select /*+ INDEX(" + indexName + ")*/ k, v1 from " + fullTableName);
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+    
+}
+

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java
new file mode 100644
index 0000000..0e16d97
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index.txn;
+
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
+import static org.apache.phoenix.util.TestUtil.LOCALHOST;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Maps;
+
+@RunWith(Parameterized.class)
+public class TxWriteFailureIT extends BaseTest {
+	
+    private static PhoenixTestDriver driver;
+    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+    private static final String SCHEMA_NAME = "S";
+    private static final String DATA_TABLE_NAME = "T";
+    private static final String INDEX_TABLE_NAME = "I";
+    private static final String DATA_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, DATA_TABLE_NAME);
+    private static final String INDEX_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, INDEX_TABLE_NAME);
+    private static final String ROW_TO_FAIL = "fail";
+    
+    private final boolean localIndex;
+	private final boolean mutable;
+
+	public TxWriteFailureIT(boolean localIndex, boolean mutable) {
+		this.localIndex = localIndex;
+		this.mutable = mutable;
+	}
+
+	@BeforeClass
+	public static void setupCluster() throws Exception {
+		Configuration conf = TEST_UTIL.getConfiguration();
+		setUpConfigForMiniCluster(conf);
+		conf.setClass("hbase.coprocessor.region.classes", FailingRegionObserver.class, RegionObserver.class);
+		conf.setBoolean("hbase.coprocessor.abortonerror", false);
+		conf.setBoolean(Indexer.CHECK_VERSION_CONF_KEY, false);
+		TEST_UTIL.startMiniCluster();
+		String clientPort = TEST_UTIL.getConfiguration().get(
+				QueryServices.ZOOKEEPER_PORT_ATTRIB);
+		url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST
+				+ JDBC_PROTOCOL_SEPARATOR + clientPort
+				+ JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
+
+		Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+		// Must update config before starting server
+		props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true));
+		driver = initAndRegisterDriver(url, new ReadOnlyProps(props.entrySet().iterator()));
+		clusterInitialized = true;
+		setupTxManager();
+	}
+	
+	@Parameters(name="localIndex = {0} , mutable = {1}")
+    public static Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] {
+                 { false, false }, { false, true }, { true, false }, { true, true }
+           });
+    }
+	
+	@Test
+    public void testIndexTableWriteFailure() throws Exception {
+        helpTestWriteFailure(true);
+	}
+	
+	@Test
+    public void testDataTableWriteFailure() throws Exception {
+        helpTestWriteFailure(false);
+	}
+
+	private void helpTestWriteFailure(boolean indexTableWriteFailure) throws SQLException {
+		ResultSet rs;
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = driver.connect(url, props);
+        conn.setAutoCommit(false);
+        conn.createStatement().execute(
+                "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR PRIMARY KEY, v1 VARCHAR)"+(!mutable? " IMMUTABLE_ROWS=true" : ""));
+        conn.createStatement().execute(
+                "CREATE "+(localIndex? "LOCAL " : "")+"INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1)");
+        
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?)");
+        // to create a data table write failure set k as the ROW_TO_FAIL, to create an index table write failure set v1 as the ROW_TO_FAIL, 
+        // FailingRegionObserver will throw an exception if the put contains ROW_TO_FAIL
+        stmt.setString(1, !indexTableWriteFailure ? ROW_TO_FAIL : "k1");
+        stmt.setString(2, indexTableWriteFailure ? ROW_TO_FAIL : "k2");
+        stmt.execute();
+        stmt.setString(1, "k2");
+        stmt.setString(2, "v2");
+        stmt.execute();
+        try {
+        	conn.commit();
+        	fail();
+        }
+        catch (Exception e) {
+        	conn.rollback();
+        }
+        stmt.setString(1, "k3");
+        stmt.setString(2, "v3");
+        stmt.execute();
+        //this should pass
+        conn.commit();
+        
+        // verify that only k3,v3 exists in the data table
+        String dataSql = "SELECT k, v1 FROM " + DATA_TABLE_FULL_NAME + " order by k";
+        rs = conn.createStatement().executeQuery("EXPLAIN "+dataSql);
+        assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER S.T",
+                QueryUtil.getExplainPlan(rs));
+        rs = conn.createStatement().executeQuery(dataSql);
+        assertTrue(rs.next());
+        assertEquals("k3", rs.getString(1));
+        assertEquals("v3", rs.getString(2));
+        assertFalse(rs.next());
+
+        // verify the only k3,v3  exists in the index table
+        String indexSql = "SELECT k, v1 FROM " + DATA_TABLE_FULL_NAME + " order by v1";
+        rs = conn.createStatement().executeQuery("EXPLAIN "+indexSql);
+        if(localIndex) {
+            assertEquals(
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + DATA_TABLE_FULL_NAME + " [-32768]\n" + 
+                "    SERVER FILTER BY FIRST KEY ONLY\n" +
+                "CLIENT MERGE SORT",
+                QueryUtil.getExplainPlan(rs));
+        } else {
+	        assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + INDEX_TABLE_FULL_NAME + "\n    SERVER FILTER BY FIRST KEY ONLY",
+	                QueryUtil.getExplainPlan(rs));
+        }
+        rs = conn.createStatement().executeQuery(indexSql);
+        assertTrue(rs.next());
+        assertEquals("k3", rs.getString(1));
+        assertEquals("v3", rs.getString(2));
+        assertFalse(rs.next());
+        
+        conn.createStatement().execute("DROP TABLE " + DATA_TABLE_FULL_NAME);
+	}
+	
+	
+	public static class FailingRegionObserver extends SimpleRegionObserver {
+        @Override
+        public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
+                final Durability durability) throws HBaseIOException {
+            if (shouldFailUpsert(c, put)) {
+                // throwing anything other than instances of IOException result
+                // in this coprocessor being unloaded
+                // DoNotRetryIOException tells HBase not to retry this mutation
+                // multiple times
+                throw new DoNotRetryIOException();
+            }
+        }
+        
+        private boolean shouldFailUpsert(ObserverContext<RegionCoprocessorEnvironment> c, Put put) {
+            return Bytes.contains(put.getRow(), Bytes.toBytes(ROW_TO_FAIL));
+        }
+        
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
index e0f0a3c..4b06834 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java
@@ -22,8 +22,6 @@ package org.apache.phoenix.execute;
 import static com.google.common.collect.Lists.newArrayList;
 import static com.google.common.collect.Sets.newHashSet;
 import static java.util.Collections.singletonList;
-import static org.apache.phoenix.query.BaseTest.initAndRegisterDriver;
-import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
@@ -39,6 +37,8 @@ import java.sql.Driver;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -57,38 +58,37 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
 import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
-@Category(NeedsOwnMiniClusterTest.class)
-public class PartialCommitIT {
+@RunWith(Parameterized.class)
+public class PartialCommitIT extends BaseTest {
     
-    private static final String TABLE_NAME_TO_FAIL = "b_failure_table".toUpperCase();
-    private static final byte[] ROW_TO_FAIL = Bytes.toBytes("fail me");
-    private static final String UPSERT_TO_FAIL = "upsert into " + TABLE_NAME_TO_FAIL + " values ('" + Bytes.toString(ROW_TO_FAIL) + "', 'boom!')";
-    private static final String UPSERT_SELECT_TO_FAIL = "upsert into " + TABLE_NAME_TO_FAIL + " select k, c from a_success_table";
-    private static final String DELETE_TO_FAIL = "delete from " + TABLE_NAME_TO_FAIL + "  where k='z'";
+	private final String A_SUCESS_TABLE;
+	private final String B_FAILURE_TABLE;
+	private final String C_SUCESS_TABLE;
+    private final String UPSERT_TO_FAIL;
+    private final String UPSERT_SELECT_TO_FAIL;
+    private final String DELETE_TO_FAIL;
+    private static final String TABLE_NAME_TO_FAIL = "B_FAILURE_TABLE";
+    private static final byte[] ROW_TO_FAIL_UPSERT_BYTES = Bytes.toBytes("fail me upsert");
+    private static final byte[] ROW_TO_FAIL_DELETE_BYTES = Bytes.toBytes("fail me delete");
     private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-    private static String url;
     private static Driver driver;
-    private static final Properties props = new Properties();
-    
-    static {
-        props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 10);
-    }
     
     @BeforeClass
     public static void setupCluster() throws Exception {
@@ -104,29 +104,50 @@ public class PartialCommitIT {
 
       Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
       // Must update config before starting server
-      props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
-      driver = initAndRegisterDriver(url, new ReadOnlyProps(props.entrySet().iterator()));
+      driver = initAndRegisterDriver(url, ReadOnlyProps.EMPTY_PROPS);
+      clusterInitialized = true;
+      setupTxManager();
       createTablesWithABitOfData();
     }
     
+    @Parameters(name="transactional = {0}")
+    public static Collection<Boolean> data() {
+        return Arrays.asList(false, true);
+    }
+    
+    public PartialCommitIT(boolean transactional) {
+		if (transactional) {
+			A_SUCESS_TABLE = "A_SUCCESS_TABLE_TXN";
+			B_FAILURE_TABLE = TABLE_NAME_TO_FAIL+"_TXN";
+			C_SUCESS_TABLE = "C_SUCCESS_TABLE_TXN";
+		}
+		else {
+			A_SUCESS_TABLE = "A_SUCCESS_TABLE";
+			B_FAILURE_TABLE = TABLE_NAME_TO_FAIL;
+			C_SUCESS_TABLE = "C_SUCCESS_TABLE";
+		}
+	    UPSERT_TO_FAIL = "upsert into " + B_FAILURE_TABLE + " values ('" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "', 'boom!')";
+	    UPSERT_SELECT_TO_FAIL = "upsert into " + B_FAILURE_TABLE + " select k, c from a_success_table";
+	    DELETE_TO_FAIL = "delete from " + B_FAILURE_TABLE + "  where k='" + Bytes.toString(ROW_TO_FAIL_DELETE_BYTES) + "'";
+	}
+    
     private static void createTablesWithABitOfData() throws Exception {
-        Properties props = new Properties();
-        props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 10);
-
         try (Connection con = driver.connect(url, new Properties())) {
             Statement sta = con.createStatement();
             sta.execute("create table a_success_table (k varchar primary key, c varchar)");
             sta.execute("create table b_failure_table (k varchar primary key, c varchar)");
             sta.execute("create table c_success_table (k varchar primary key, c varchar)");
+            sta.execute("create table a_success_table_txn (k varchar primary key, c varchar) TRANSACTIONAL=true");
+            sta.execute("create table b_failure_table_txn (k varchar primary key, c varchar) TRANSACTIONAL=true");
+            sta.execute("create table c_success_table_txn (k varchar primary key, c varchar) TRANSACTIONAL=true");
             con.commit();
         }
 
-        props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 100);
-
         try (Connection con = driver.connect(url, new Properties())) {
             con.setAutoCommit(false);
             Statement sta = con.createStatement();
-            for (String table : newHashSet("a_success_table", TABLE_NAME_TO_FAIL, "c_success_table")) {
+            for (String table : newHashSet("a_success_table", "b_failure_table", "c_success_table", 
+            		"a_success_table_txn", "b_failure_table_txn", "c_success_table_txn")) {
                 sta.execute("upsert into " + table + " values ('z', 'z')");
                 sta.execute("upsert into " + table + " values ('zz', 'zz')");
                 sta.execute("upsert into " + table + " values ('zzz', 'zzz')");
@@ -142,46 +163,44 @@ public class PartialCommitIT {
     
     @Test
     public void testNoFailure() {
-        testPartialCommit(singletonList("upsert into a_success_table values ('testNoFailure', 'a')"), 0, new int[0], false,
-                                        singletonList("select count(*) from a_success_table where k='testNoFailure'"), singletonList(new Integer(1)));
+        testPartialCommit(singletonList("upsert into " + A_SUCESS_TABLE + " values ('testNoFailure', 'a')"), 0, new int[0], false,
+                                        singletonList("select count(*) from " + A_SUCESS_TABLE + " where k='testNoFailure'"), singletonList(new Integer(1)));
     }
     
     @Test
     public void testUpsertFailure() {
-        testPartialCommit(newArrayList("upsert into a_success_table values ('testUpsertFailure1', 'a')", 
+        testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " values ('testUpsertFailure1', 'a')", 
                                        UPSERT_TO_FAIL, 
-                                       "upsert into a_success_table values ('testUpsertFailure2', 'b')"), 
+                                       "upsert into " + A_SUCESS_TABLE + " values ('testUpsertFailure2', 'b')"), 
                                        1, new int[]{1}, true,
-                                       newArrayList("select count(*) from a_success_table where k like 'testUpsertFailure_'",
-                                                    "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), 
+                                       newArrayList("select count(*) from " + A_SUCESS_TABLE + " where k like 'testUpsertFailure_'",
+                                                    "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"), 
                                        newArrayList(new Integer(2), new Integer(0)));
     }
     
     @Test
     public void testUpsertSelectFailure() throws SQLException {
-        props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 100);
-
         try (Connection con = driver.connect(url, new Properties())) {
-            con.createStatement().execute("upsert into a_success_table values ('" + Bytes.toString(ROW_TO_FAIL) + "', 'boom!')");
+            con.createStatement().execute("upsert into " + A_SUCESS_TABLE + " values ('" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "', 'boom!')");
             con.commit();
         }
         
-        testPartialCommit(newArrayList("upsert into a_success_table values ('testUpsertSelectFailure', 'a')", 
+        testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " values ('testUpsertSelectFailure', 'a')", 
                                        UPSERT_SELECT_TO_FAIL), 
                                        1, new int[]{1}, true,
-                                       newArrayList("select count(*) from a_success_table where k in ('testUpsertSelectFailure', '" + Bytes.toString(ROW_TO_FAIL) + "')",
-                                                    "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), 
+                                       newArrayList("select count(*) from " + A_SUCESS_TABLE + " where k in ('testUpsertSelectFailure', '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "')",
+                                                    "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"), 
                                        newArrayList(new Integer(2), new Integer(0)));
     }
     
     @Test
     public void testDeleteFailure() {
-        testPartialCommit(newArrayList("upsert into a_success_table values ('testDeleteFailure1', 'a')", 
+        testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " values ('testDeleteFailure1', 'a')", 
                                        DELETE_TO_FAIL,
-                                       "upsert into a_success_table values ('testDeleteFailure2', 'b')"), 
+                                       "upsert into " + A_SUCESS_TABLE + " values ('testDeleteFailure2', 'b')"), 
                                        1, new int[]{1}, true,
-                                       newArrayList("select count(*) from a_success_table where k like 'testDeleteFailure_'",
-                                                    "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = 'z'"), 
+                                       newArrayList("select count(*) from " + A_SUCESS_TABLE + " where k like 'testDeleteFailure_'",
+                                                    "select count(*) from " + B_FAILURE_TABLE + " where k = 'z'"), 
                                        newArrayList(new Integer(2), new Integer(1)));
     }
     
@@ -190,27 +209,27 @@ public class PartialCommitIT {
      */
     @Test
     public void testOrderOfMutationsIsPredicatable() {
-        testPartialCommit(newArrayList("upsert into c_success_table values ('testOrderOfMutationsIsPredicatable', 'c')", // will fail because c_success_table is after b_failure_table by table sort order
+        testPartialCommit(newArrayList("upsert into " + C_SUCESS_TABLE + " values ('testOrderOfMutationsIsPredicatable', 'c')", // will fail because c_success_table is after b_failure_table by table sort order
                                        UPSERT_TO_FAIL, 
-                                       "upsert into a_success_table values ('testOrderOfMutationsIsPredicatable', 'a')"), // will succeed because a_success_table is before b_failure_table by table sort order
+                                       "upsert into " + A_SUCESS_TABLE + " values ('testOrderOfMutationsIsPredicatable', 'a')"), // will succeed because a_success_table is before b_failure_table by table sort order
                                        2, new int[]{0,1}, true,
-                                       newArrayList("select count(*) from c_success_table where k='testOrderOfMutationsIsPredicatable'",
-                                                    "select count(*) from a_success_table where k='testOrderOfMutationsIsPredicatable'",
-                                                    "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + Bytes.toString(ROW_TO_FAIL) + "'"), 
+                                       newArrayList("select count(*) from " + C_SUCESS_TABLE + " where k='testOrderOfMutationsIsPredicatable'",
+                                                    "select count(*) from " + A_SUCESS_TABLE + " where k='testOrderOfMutationsIsPredicatable'",
+                                                    "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'"), 
                                        newArrayList(new Integer(0), new Integer(1), new Integer(0)));
     }
     
     @Test
-    public void checkThatAllStatementTypesMaintainOrderInConnection() {
-        testPartialCommit(newArrayList("upsert into a_success_table values ('k', 'checkThatAllStatementTypesMaintainOrderInConnection')", 
-                                       "upsert into a_success_table select k, c from c_success_table",
+    public void testStatementOrderMaintainedInConnection() {
+        testPartialCommit(newArrayList("upsert into " + A_SUCESS_TABLE + " values ('testStatementOrderMaintainedInConnection', 'a')", 
+                                       "upsert into " + A_SUCESS_TABLE + " select k, c from " + C_SUCESS_TABLE,
                                        DELETE_TO_FAIL,
-                                       "select * from a_success_table", 
+                                       "select * from " + A_SUCESS_TABLE + "", 
                                        UPSERT_TO_FAIL), 
                                        2, new int[]{2,4}, true,
-                                       newArrayList("select count(*) from a_success_table where k='testOrderOfMutationsIsPredicatable' or k like 'z%'", // rows left: zz, zzz, checkThatAllStatementTypesMaintainOrderInConnection
-                                                    "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = '" + ROW_TO_FAIL + "'",
-                                                    "select count(*) from " + TABLE_NAME_TO_FAIL + " where k = 'z'"), 
+                                       newArrayList("select count(*) from " + A_SUCESS_TABLE + " where k='testStatementOrderMaintainedInConnection' or k like 'z%'", // rows left: zz, zzz, checkThatAllStatementTypesMaintainOrderInConnection
+                                                    "select count(*) from " + B_FAILURE_TABLE + " where k = '" + Bytes.toString(ROW_TO_FAIL_UPSERT_BYTES) + "'",
+                                                    "select count(*) from " + B_FAILURE_TABLE + " where k = 'z'"), 
                                        newArrayList(new Integer(4), new Integer(0), new Integer(1)));
     }
     
@@ -259,10 +278,11 @@ public class PartialCommitIT {
         Connection con = driver.connect(url, new Properties());
         PhoenixConnection phxCon = new PhoenixConnection(con.unwrap(PhoenixConnection.class));
         final Map<TableRef,Map<ImmutableBytesPtr,MutationState.RowMutationState>> mutations = Maps.newTreeMap(new TableRefComparator());
-        return new PhoenixConnection(phxCon) {
+        // passing a null mutation staate forces the connection.newMutationState() to be used to create the MutationState
+        return new PhoenixConnection(phxCon, null) {
             @Override
             protected MutationState newMutationState(int maxSize) {
-                return new MutationState(maxSize, this, mutations);
+                return new MutationState(maxSize, this, mutations, null);
             };
         };
     }
@@ -271,7 +291,7 @@ public class PartialCommitIT {
         @Override
         public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
                 final Durability durability) throws HBaseIOException {
-            if (shouldFailUpsert(c, put)) {
+            if (shouldFail(c, put)) {
                 // throwing anything other than instances of IOException result
                 // in this coprocessor being unloaded
                 // DoNotRetryIOException tells HBase not to retry this mutation
@@ -283,7 +303,7 @@ public class PartialCommitIT {
         @Override
         public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c,
                 Delete delete, WALEdit edit, Durability durability) throws IOException {
-            if (shouldFailDelete(c, delete)) {
+            if (shouldFail(c, delete)) {
                 // throwing anything other than instances of IOException result
                 // in this coprocessor being unloaded
                 // DoNotRetryIOException tells HBase not to retry this mutation
@@ -292,18 +312,13 @@ public class PartialCommitIT {
             }
         }
         
-        private boolean shouldFailUpsert(ObserverContext<RegionCoprocessorEnvironment> c, Put put) {
+        private boolean shouldFail(ObserverContext<RegionCoprocessorEnvironment> c, Mutation m) {
             String tableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
-            return TABLE_NAME_TO_FAIL.equals(tableName) && Bytes.equals(ROW_TO_FAIL, put.getRow());
+            // deletes on transactional tables are converted to put, so use a single helper method
+            return tableName.contains(TABLE_NAME_TO_FAIL) && 
+            		(Bytes.equals(ROW_TO_FAIL_UPSERT_BYTES, m.getRow()) || Bytes.equals(ROW_TO_FAIL_DELETE_BYTES, m.getRow()));
         }
         
-        private boolean shouldFailDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete) {
-            String tableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
-            return TABLE_NAME_TO_FAIL.equals(tableName) &&  
-                // Phoenix deletes are sent as Mutations with empty values
-                delete.getFamilyCellMap().firstEntry().getValue().get(0).getValueLength() == 0 &&
-                delete.getFamilyCellMap().firstEntry().getValue().get(0).getQualifierLength() == 0;
-        }
     }
     
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java
index b7f880a..98932e1 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheIT.java
@@ -20,9 +20,8 @@ package org.apache.phoenix.rpc;
 import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA;
 import static org.apache.phoenix.util.TestUtil.MUTABLE_INDEX_DATA_TABLE;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
+import static org.apache.phoenix.util.TestUtil.TRANSACTIONAL_DATA_TABLE;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isNull;
@@ -30,33 +29,26 @@ import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
-import java.math.BigDecimal;
 import java.sql.Connection;
-import java.sql.Date;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.Map;
 import java.util.Properties;
 
 import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
-import org.apache.phoenix.end2end.Shadower;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.types.PVarchar;
-import org.apache.phoenix.util.DateUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import com.google.common.collect.Maps;
-
 /**
  * Verifies the number of rpcs calls from {@link MetaDataClient} updateCache() 
  * for transactional and non-transactional tables.
@@ -68,86 +60,76 @@ public class UpdateCacheIT extends BaseHBaseManagedTimeIT {
     @Before
     public void setUp() throws SQLException {
         ensureTableCreated(getUrl(), MUTABLE_INDEX_DATA_TABLE);
+        ensureTableCreated(getUrl(), TRANSACTIONAL_DATA_TABLE);
     }
 
-	@BeforeClass
-    @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
-    public static void doSetup() throws Exception {
-        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
-        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    @Test
+    public void testUpdateCacheForTxnTable() throws Exception {
+        helpTestUpdateCache(true, null);
     }
-	
-	public static void validateRowKeyColumns(ResultSet rs, int i) throws SQLException {
-		assertTrue(rs.next());
-		assertEquals(rs.getString(1), "varchar" + String.valueOf(i));
-		assertEquals(rs.getString(2), "char" + String.valueOf(i));
-		assertEquals(rs.getInt(3), i);
-		assertEquals(rs.getInt(4), i);
-		assertEquals(rs.getBigDecimal(5), new BigDecimal(i*0.5d));
-		Date date = new Date(DateUtil.parseDate("2015-01-01 00:00:00").getTime() + (i - 1) * NUM_MILLIS_IN_DAY);
-		assertEquals(rs.getDate(6), date);
-	}
-	
-	public static void setRowKeyColumns(PreparedStatement stmt, int i) throws SQLException {
-        // insert row
-        stmt.setString(1, "varchar" + String.valueOf(i));
-        stmt.setString(2, "char" + String.valueOf(i));
-        stmt.setInt(3, i);
-        stmt.setLong(4, i);
-        stmt.setBigDecimal(5, new BigDecimal(i*0.5d));
-        Date date = new Date(DateUtil.parseDate("2015-01-01 00:00:00").getTime() + (i - 1) * NUM_MILLIS_IN_DAY);
-        stmt.setDate(6, date);
+    
+    @Test
+    public void testUpdateCacheForNonTxnTable() throws Exception {
+        helpTestUpdateCache(false, null);
     }
 	
-	@Test
-	public void testUpdateCache() throws Exception {
-		String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + MUTABLE_INDEX_DATA_TABLE;
+	public static void helpTestUpdateCache(boolean isTransactional, Long scn) throws Exception {
+	    String tableName = isTransactional ? TRANSACTIONAL_DATA_TABLE : MUTABLE_INDEX_DATA_TABLE;
+	    String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + tableName;
 		String selectSql = "SELECT * FROM "+fullTableName;
 		// use a spyed ConnectionQueryServices so we can verify calls to getTable
 		ConnectionQueryServices connectionQueryServices = Mockito.spy(driver.getConnectionQueryServices(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)));
 		Properties props = new Properties();
 		props.putAll(PhoenixEmbeddedDriver.DEFFAULT_PROPS.asMap());
+		if (scn!=null) {
+            props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn));
+        }
 		Connection conn = connectionQueryServices.connect(getUrl(), props);
 		try {
 			conn.setAutoCommit(false);
-			ResultSet rs = conn.createStatement().executeQuery(selectSql);
-	     	assertFalse(rs.next());
-	     	reset(connectionQueryServices);
-	     	
 	        String upsert = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
 	        PreparedStatement stmt = conn.prepareStatement(upsert);
 			// upsert three rows
-	        setRowKeyColumns(stmt, 1);
+	        TestUtil.setRowKeyColumns(stmt, 1);
 			stmt.execute();
-			setRowKeyColumns(stmt, 2);
+			TestUtil.setRowKeyColumns(stmt, 2);
 			stmt.execute();
-			setRowKeyColumns(stmt, 3);
+			TestUtil.setRowKeyColumns(stmt, 3);
 			stmt.execute();
 			conn.commit();
-			// verify only one rpc to getTable occurs after commit is called
-			verify(connectionQueryServices, times(1)).getTable((PName)isNull(), eq(PVarchar.INSTANCE.toBytes(INDEX_DATA_SCHEMA)), eq(PVarchar.INSTANCE.toBytes(MUTABLE_INDEX_DATA_TABLE)), anyLong(), anyLong());
-			reset(connectionQueryServices);
+			// verify only one rpc to fetch table metadata, 
+            verify(connectionQueryServices).getTable((PName)isNull(), eq(PVarchar.INSTANCE.toBytes(INDEX_DATA_SCHEMA)), eq(PVarchar.INSTANCE.toBytes(tableName)), anyLong(), anyLong());
+            reset(connectionQueryServices);
+            
+            if (scn!=null) {
+                // advance scn so that we can see the data we just upserted
+                props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(scn+2));
+                conn = connectionQueryServices.connect(getUrl(), props);
+            }
 			
-			rs = conn.createStatement().executeQuery(selectSql);
-			validateRowKeyColumns(rs, 1);
-			validateRowKeyColumns(rs, 2);
-			validateRowKeyColumns(rs, 3);
+            ResultSet rs = conn.createStatement().executeQuery(selectSql);
+			TestUtil.validateRowKeyColumns(rs, 1);
+			TestUtil.validateRowKeyColumns(rs, 2);
+			TestUtil.validateRowKeyColumns(rs, 3);
 	        assertFalse(rs.next());
 	        
 	        rs = conn.createStatement().executeQuery(selectSql);
-	        validateRowKeyColumns(rs, 1);
-	        validateRowKeyColumns(rs, 2);
-	        validateRowKeyColumns(rs, 3);
+	        TestUtil.validateRowKeyColumns(rs, 1);
+	        TestUtil.validateRowKeyColumns(rs, 2);
+	        TestUtil.validateRowKeyColumns(rs, 3);
 	        assertFalse(rs.next());
 	        
 	        rs = conn.createStatement().executeQuery(selectSql);
-	        validateRowKeyColumns(rs, 1);
-	        validateRowKeyColumns(rs, 2);
-	        validateRowKeyColumns(rs, 3);
+	        TestUtil.validateRowKeyColumns(rs, 1);
+	        TestUtil.validateRowKeyColumns(rs, 2);
+	        TestUtil.validateRowKeyColumns(rs, 3);
 	        assertFalse(rs.next());
-	        conn.commit();
-	        // there should be one rpc to getTable per query
-	        verify(connectionQueryServices, times(3)).getTable((PName)isNull(), eq(PVarchar.INSTANCE.toBytes(INDEX_DATA_SCHEMA)), eq(PVarchar.INSTANCE.toBytes(MUTABLE_INDEX_DATA_TABLE)), anyLong(), anyLong());
+	        
+	        // for non-transactional tables without a scn : verify one rpc to getTable occurs *per* query
+            // for non-transactional tables with a scn : verify *only* one rpc occurs
+            // for transactional tables : verify *only* one rpc occurs
+            int numRpcs = isTransactional || scn!=null ? 1 : 3; 
+            verify(connectionQueryServices, times(numRpcs)).getTable((PName)isNull(), eq(PVarchar.INSTANCE.toBytes(INDEX_DATA_SCHEMA)), eq(PVarchar.INSTANCE.toBytes(tableName)), anyLong(), anyLong());
 		}
         finally {
         	conn.close();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheWithScnIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheWithScnIT.java b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheWithScnIT.java
new file mode 100644
index 0000000..dbc7fd1
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/rpc/UpdateCacheWithScnIT.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.rpc;
+
+import static org.apache.phoenix.util.TestUtil.MUTABLE_INDEX_DATA_TABLE;
+
+import org.apache.phoenix.end2end.BaseClientManagedTimeIT;
+import org.junit.Before;
+import org.junit.Test;
+
+public class UpdateCacheWithScnIT extends BaseClientManagedTimeIT {
+	
+	protected long ts;
+
+	@Before
+	public void initTable() throws Exception {
+		ts = nextTimestamp();
+		ensureTableCreated(getUrl(), MUTABLE_INDEX_DATA_TABLE, ts);
+	}
+	
+	@Test
+	public void testUpdateCacheWithScn() throws Exception {
+		UpdateCacheIT.helpTestUpdateCache(false, ts+2);
+	}
+
+}