You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2015/10/26 23:41:46 UTC

[2/4] phoenix git commit: Increase testing around transaction integration PHOENIX-1900

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
new file mode 100644
index 0000000..127c988
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.primitives.Doubles;
+
+@RunWith(Parameterized.class)
+public class MutableIndexIT extends BaseHBaseManagedTimeIT {
+    
+    protected final boolean localIndex;
+    private final String tableDDLOptions;
+	
+    public MutableIndexIT(boolean localIndex, boolean transactional) {
+		this.localIndex = localIndex;
+		StringBuilder optionBuilder = new StringBuilder();
+		if (transactional) {
+			optionBuilder.append("TRANSACTIONAL=true");
+		}
+		this.tableDDLOptions = optionBuilder.toString();
+	}
+	
+	@Parameters(name="localIndex = {0} , transactional = {1}")
+    public static Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] {     
+                 { false, false }, { false, true }, { true, false }, { true, true }
+           });
+    }
+    
+    @Test
+    public void testCoveredColumnUpdates() throws Exception {
+    	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+	        conn.setAutoCommit(false);
+	        // create unique table and index names for each parameterized test
+	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
+	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
+	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+	        
+            createMultiCFTestTable(fullTableName, tableDDLOptions);
+            populateMultiCFTestTable(fullTableName);
+            PreparedStatement stmt = conn.prepareStatement("CREATE " + (localIndex ? " LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName 
+            		+ " (char_col1 ASC, int_col1 ASC) INCLUDE (long_col1, long_col2)");
+            stmt.execute();
+            
+            String query = "SELECT char_col1, int_col1, long_col2 from " + fullTableName;
+            ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+            if (localIndex) {
+                assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName +" [-32768]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
+            } else {
+                assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName, QueryUtil.getExplainPlan(rs));
+            }
+            
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("chara", rs.getString(1));
+            assertEquals(2, rs.getInt(2));
+            assertEquals(3L, rs.getLong(3));
+            assertTrue(rs.next());
+            assertEquals("chara", rs.getString(1));
+            assertEquals(3, rs.getInt(2));
+            assertEquals(4L, rs.getLong(3));
+            assertTrue(rs.next());
+            assertEquals("chara", rs.getString(1));
+            assertEquals(4, rs.getInt(2));
+            assertEquals(5L, rs.getLong(3));
+            assertFalse(rs.next());
+            
+            stmt = conn.prepareStatement("UPSERT INTO " + fullTableName
+                    + "(varchar_pk, char_pk, int_pk, long_pk , decimal_pk, long_col2) SELECT varchar_pk, char_pk, int_pk, long_pk , decimal_pk, long_col2*2 FROM "
+                    + fullTableName + " WHERE long_col2=?");
+            stmt.setLong(1,4L);
+            assertEquals(1,stmt.executeUpdate());
+            conn.commit();
+
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("chara", rs.getString(1));
+            assertEquals(2, rs.getInt(2));
+            assertEquals(3L, rs.getLong(3));
+            assertTrue(rs.next());
+            assertEquals("chara", rs.getString(1));
+            assertEquals(3, rs.getInt(2));
+            assertEquals(8L, rs.getLong(3));
+            assertTrue(rs.next());
+            assertEquals("chara", rs.getString(1));
+            assertEquals(4, rs.getInt(2));
+            assertEquals(5L, rs.getLong(3));
+            assertFalse(rs.next());
+            
+            stmt = conn.prepareStatement("UPSERT INTO " + fullTableName
+                    + "(varchar_pk, char_pk, int_pk, long_pk , decimal_pk, long_col2) SELECT varchar_pk, char_pk, int_pk, long_pk , decimal_pk, null FROM "
+                    + fullTableName + " WHERE long_col2=?");
+            stmt.setLong(1,3L);
+            assertEquals(1,stmt.executeUpdate());
+            conn.commit();
+            
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("chara", rs.getString(1));
+            assertEquals(2, rs.getInt(2));
+            assertEquals(0, rs.getLong(3));
+            assertTrue(rs.wasNull());
+            assertTrue(rs.next());
+            assertEquals("chara", rs.getString(1));
+            assertEquals(3, rs.getInt(2));
+            assertEquals(8L, rs.getLong(3));
+            assertTrue(rs.next());
+            assertEquals("chara", rs.getString(1));
+            assertEquals(4, rs.getInt(2));
+            assertEquals(5L, rs.getLong(3));
+            assertFalse(rs.next());
+            if(localIndex) {
+                query = "SELECT b.* from " + fullTableName + " where int_col1 = 4";
+                rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+                assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName +" [-32768]\n" +
+                		"    SERVER FILTER BY TO_INTEGER(\"INT_COL1\") = 4\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
+                rs = conn.createStatement().executeQuery(query);
+                assertTrue(rs.next());
+                assertEquals("varchar_b", rs.getString(1));
+                assertEquals("charb", rs.getString(2));
+                assertEquals(5, rs.getInt(3));
+                assertEquals(5, rs.getLong(4));
+                assertFalse(rs.next());
+                
+            }
+        } 
+    }
+    
+    @Test
+    public void testCoveredColumns() throws Exception {
+    	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+	        conn.setAutoCommit(false);
+	        String query;
+	        ResultSet rs;
+	        // create unique table and index names for each parameterized test
+	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
+	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
+	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+	        conn.createStatement().execute("CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions);
+	        query = "SELECT * FROM " + fullTableName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertFalse(rs.next());
+	        
+	        conn.createStatement().execute("CREATE " + (localIndex ? " LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertFalse(rs.next());
+	
+	        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+	        stmt.setString(1,"a");
+	        stmt.setString(2, "x");
+	        stmt.setString(3, "1");
+	        stmt.execute();
+	        conn.commit();
+	        
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("x",rs.getString(1));
+	        assertEquals("a",rs.getString(2));
+	        assertEquals("1",rs.getString(3));
+	        assertFalse(rs.next());
+	
+	        stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "(k,v2) VALUES(?,?)");
+	        stmt.setString(1,"a");
+	        stmt.setString(2, null);
+	        stmt.execute();
+	        conn.commit();
+	        
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("x",rs.getString(1));
+	        assertEquals("a",rs.getString(2));
+	        assertNull(rs.getString(3));
+	        assertFalse(rs.next());
+	
+	        query = "SELECT * FROM " + fullTableName;
+	        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+	        if(localIndex) {
+	            assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName+" [-32768]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));            
+	        } else {
+	            assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName, QueryUtil.getExplainPlan(rs));
+	        }
+	
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("a",rs.getString(1));
+	        assertEquals("x",rs.getString(2));
+	        assertNull(rs.getString(3));
+	        assertFalse(rs.next());
+	
+	        stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "(k,v2) VALUES(?,?)");
+	        stmt.setString(1,"a");
+	        stmt.setString(2,"3");
+	        stmt.execute();
+	        conn.commit();
+	        
+	        query = "SELECT * FROM " + fullTableName;
+	        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+	        if(localIndex) {
+	            assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName + " [-32768]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));            
+	        } else {
+	            assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName, QueryUtil.getExplainPlan(rs));
+	        }
+	        
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("a",rs.getString(1));
+	        assertEquals("x",rs.getString(2));
+	        assertEquals("3",rs.getString(3));
+	        assertFalse(rs.next());
+	
+	        stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "(k,v2) VALUES(?,?)");
+	        stmt.setString(1,"a");
+	        stmt.setString(2,"4");
+	        stmt.execute();
+	        conn.commit();
+	        
+	        query = "SELECT * FROM " + fullTableName;
+	        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+	        if(localIndex) {
+	            assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName+" [-32768]\nCLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));            
+	        } else {
+	            assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName, QueryUtil.getExplainPlan(rs));
+	        }
+	        
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("a",rs.getString(1));
+	        assertEquals("x",rs.getString(2));
+	        assertEquals("4",rs.getString(3));
+	        assertFalse(rs.next());
+        }
+    }
+
+    @Test
+    public void testCompoundIndexKey() throws Exception {
+    	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+	        conn.setAutoCommit(false);
+	        String query;
+	        ResultSet rs;
+	        // create unique table and index names for each parameterized test
+	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
+	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
+	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+	
+	        // make sure that the tables are empty, but reachable
+	        conn.createStatement().execute("CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions);
+	        query = "SELECT * FROM " + fullTableName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertFalse(rs.next());
+	        conn.createStatement().execute("CREATE " + (localIndex ? " LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName + " (v1, v2)");
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertFalse(rs.next());
+	
+	        // load some data into the table
+	        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+	        stmt.setString(1,"a");
+	        stmt.setString(2, "x");
+	        stmt.setString(3, "1");
+	        stmt.execute();
+	        conn.commit();
+	        
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("x",rs.getString(1));
+	        assertEquals("1",rs.getString(2));
+	        assertEquals("a",rs.getString(3));
+	        assertFalse(rs.next());
+	
+	        stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+	        stmt.setString(1,"a");
+	        stmt.setString(2, "y");
+	        stmt.setString(3, null);
+	        stmt.execute();
+	        conn.commit();
+	        
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("y",rs.getString(1));
+	        assertNull(rs.getString(2));
+	        assertEquals("a",rs.getString(3));
+	        assertFalse(rs.next());
+	
+	        query = "SELECT * FROM " + fullTableName;
+	        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+	        if (localIndex) {
+	            assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName+" [-32768]\n"
+	                    + "    SERVER FILTER BY FIRST KEY ONLY\n"
+	                    + "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
+	        } else {
+	            assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName + "\n"
+	                       + "    SERVER FILTER BY FIRST KEY ONLY", QueryUtil.getExplainPlan(rs));
+	        }
+	        //make sure the data table looks like what we expect
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("a",rs.getString(1));
+	        assertEquals("y",rs.getString(2));
+	        assertNull(rs.getString(3));
+	        assertFalse(rs.next());
+	        
+	        // Upsert new row with null leading index column
+	        stmt.setString(1,"b");
+	        stmt.setString(2, null);
+	        stmt.setString(3, "3");
+	        stmt.execute();
+	        conn.commit();
+	        
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals(null,rs.getString(1));
+	        assertEquals("3",rs.getString(2));
+	        assertEquals("b",rs.getString(3));
+	        assertTrue(rs.next());
+	        assertEquals("y",rs.getString(1));
+	        assertNull(rs.getString(2));
+	        assertEquals("a",rs.getString(3));
+	        assertFalse(rs.next());
+	
+	        // Update row with null leading index column to have a value
+	        stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?)");
+	        stmt.setString(1,"b");
+	        stmt.setString(2, "z");
+	        stmt.execute();
+	        conn.commit();
+	        
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("y",rs.getString(1));
+	        assertNull(rs.getString(2));
+	        assertEquals("a",rs.getString(3));
+	        assertTrue(rs.next());
+	        assertEquals("z",rs.getString(1));
+	        assertEquals("3",rs.getString(2));
+	        assertEquals("b",rs.getString(3));
+	        assertFalse(rs.next());
+        }
+
+    }
+    
+    /**
+     * There was a case where if there were multiple updates to a single row in the same batch, the
+     * index wouldn't be updated correctly as each element of the batch was evaluated with the state
+     * previous to the batch, rather than with the rest of the batch. This meant you could do a put
+     * and a delete on a row in the same batch and the index result would contain the current + put
+     * and current + delete, but not current + put + delete.
+     * @throws Exception on failure
+     */
+    @Test
+    public void testMultipleUpdatesToSingleRow() throws Exception {
+    	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+	        conn.setAutoCommit(false);
+	        String query;
+	        ResultSet rs;
+	        // create unique table and index names for each parameterized test
+	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
+	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
+	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+    
+	        // make sure that the tables are empty, but reachable
+	        conn.createStatement().execute(
+	          "CREATE TABLE " + fullTableName
+	              + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + tableDDLOptions);
+	        query = "SELECT * FROM " + fullTableName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertFalse(rs.next());
+	
+	        conn.createStatement().execute("CREATE " + (localIndex ? " LOCAL " : "") + " INDEX " + indexName + " ON " + fullTableName + " (v1, v2)");
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertFalse(rs.next());
+	    
+	        // load some data into the table
+	        PreparedStatement stmt =
+	            conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+	        stmt.setString(1, "a");
+	        stmt.setString(2, "x");
+	        stmt.setString(3, "1");
+	        stmt.execute();
+	        conn.commit();
+	        
+	        // make sure the index is working as expected
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("x", rs.getString(1));
+	        assertEquals("1", rs.getString(2));
+	        assertEquals("a", rs.getString(3));
+	        assertFalse(rs.next());
+	      
+	        // do multiple updates to the same row, in the same batch
+	        stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "(k, v1) VALUES(?,?)");
+	        stmt.setString(1, "a");
+	        stmt.setString(2, "y");
+	        stmt.execute();
+	        stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + "(k,v2) VALUES(?,?)");
+	        stmt.setString(1, "a");
+	        stmt.setString(2, null);
+	        stmt.execute();
+	        conn.commit();
+	    
+	        query = "SELECT * FROM " + fullIndexName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("y", rs.getString(1));
+	        assertNull(rs.getString(2));
+	        assertEquals("a", rs.getString(3));
+	        assertFalse(rs.next());
+	    
+	        query = "SELECT * FROM " + fullTableName;
+	        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+	        if(localIndex) {
+	            assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName+" [-32768]\n"
+	                    + "    SERVER FILTER BY FIRST KEY ONLY\n"
+	                    + "CLIENT MERGE SORT",
+	                QueryUtil.getExplainPlan(rs));
+	        } else {
+	            assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullIndexName + "\n"
+	                    + "    SERVER FILTER BY FIRST KEY ONLY",
+	                QueryUtil.getExplainPlan(rs));
+	        }
+	    
+	        // check that the data table matches as expected
+	        rs = conn.createStatement().executeQuery(query);
+	        assertTrue(rs.next());
+	        assertEquals("a", rs.getString(1));
+	        assertEquals("y", rs.getString(2));
+	        assertNull(rs.getString(3));
+	        assertFalse(rs.next());
+        }
+    }
+    
+    @Test
+    public void testUpsertingNullForIndexedColumns() throws Exception {
+    	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+	        conn.setAutoCommit(false);
+	        ResultSet rs;
+	        // create unique table and index names for each parameterized test
+	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
+	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
+	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+	        String fullIndexeName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+    		Statement stmt = conn.createStatement();
+    		stmt.execute("CREATE TABLE " + fullTableName + "(v1 VARCHAR PRIMARY KEY, v2 DOUBLE, v3 VARCHAR) "+tableDDLOptions);
+    		stmt.execute("CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName + " ON " + fullTableName + "  (v2) INCLUDE(v3)");
+    		
+    		//create a row with value null for indexed column v2
+    		stmt.executeUpdate("upsert into " + fullTableName + " values('cc1', null, 'abc')");
+    		conn.commit();
+    		
+    		//assert values in index table 
+    		rs = stmt.executeQuery("select * from " + fullIndexeName);
+    		assertTrue(rs.next());
+    		assertEquals(0, Doubles.compare(0, rs.getDouble(1)));
+    		assertTrue(rs.wasNull());
+    		assertEquals("cc1", rs.getString(2));
+    		assertEquals("abc", rs.getString(3));
+    		assertFalse(rs.next());
+    		
+    		//assert values in data table
+    		rs = stmt.executeQuery("select v1, v2, v3 from " + fullTableName);
+    		assertTrue(rs.next());
+    		assertEquals("cc1", rs.getString(1));
+    		assertEquals(0, Doubles.compare(0, rs.getDouble(2)));
+    		assertTrue(rs.wasNull());
+    		assertEquals("abc", rs.getString(3));
+    		assertFalse(rs.next());
+    		
+    		//update the previously null value for indexed column v2 to a non-null value 1.23
+    		stmt.executeUpdate("upsert into " + fullTableName + " values('cc1', 1.23, 'abc')");
+    		conn.commit();
+    		
+    		//assert values in data table
+    		rs = stmt.executeQuery("select /*+ NO_INDEX */ v1, v2, v3 from " + fullTableName);
+    		assertTrue(rs.next());
+    		assertEquals("cc1", rs.getString(1));
+    		assertEquals(0, Doubles.compare(1.23, rs.getDouble(2)));
+    		assertEquals("abc", rs.getString(3));
+    		assertFalse(rs.next());
+    		
+    		//assert values in index table 
+    		rs = stmt.executeQuery("select * from " + fullIndexeName);
+    		assertTrue(rs.next());
+    		assertEquals(0, Doubles.compare(1.23, rs.getDouble(1)));
+    		assertEquals("cc1", rs.getString(2));
+    		assertEquals("abc", rs.getString(3));
+    		assertFalse(rs.next());
+    		
+    		//update the value for indexed column v2 back to null
+    		stmt.executeUpdate("upsert into " + fullTableName + " values('cc1', null, 'abc')");
+    		conn.commit();
+    		
+    		//assert values in index table 
+    		rs = stmt.executeQuery("select * from " + fullIndexeName);
+    		assertTrue(rs.next());
+    		assertEquals(0, Doubles.compare(0, rs.getDouble(1)));
+    		assertTrue(rs.wasNull());
+    		assertEquals("cc1", rs.getString(2));
+    		assertEquals("abc", rs.getString(3));
+    		assertFalse(rs.next());
+    		
+    		//assert values in data table
+    		rs = stmt.executeQuery("select v1, v2, v3 from " + fullTableName);
+    		assertTrue(rs.next());
+    		assertEquals("cc1", rs.getString(1));
+    		assertEquals(0, Doubles.compare(0, rs.getDouble(2)));
+    		assertEquals("abc", rs.getString(3));
+    		assertFalse(rs.next());
+    	} 
+    }
+    
+	
+    private void assertImmutableRows(Connection conn, String fullTableName, boolean expectedValue) throws SQLException {
+        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+        assertEquals(expectedValue, pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName)).isImmutableRows());
+    }
+    
+    @Test
+    public void testAlterTableWithImmutability() throws Exception {
+        String query;
+        ResultSet rs;
+        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
+        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+	        conn.setAutoCommit(false);
+	        conn.createStatement().execute(
+	            "CREATE TABLE " + fullTableName +" (k VARCHAR NOT NULL PRIMARY KEY, v VARCHAR) " + tableDDLOptions);
+	        
+	        query = "SELECT * FROM " + fullTableName;
+	        rs = conn.createStatement().executeQuery(query);
+	        assertFalse(rs.next());
+	
+	        assertImmutableRows(conn,fullTableName, false);
+	        conn.createStatement().execute("ALTER TABLE " + fullTableName +" SET IMMUTABLE_ROWS=true");
+	        assertImmutableRows(conn,fullTableName, true);
+	        
+	        
+	        conn.createStatement().execute("ALTER TABLE " + fullTableName +" SET immutable_rows=false");
+	        assertImmutableRows(conn,fullTableName, false);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java
deleted file mode 100644
index 5196b0a..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end.index;
-
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.Statement;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.phoenix.end2end.Shadower;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.google.common.collect.Maps;
-
-public class TxGlobalMutableIndexIT extends GlobalMutableIndexIT {
-    
-    @BeforeClass
-    @Shadower(classBeingShadowed = BaseMutableIndexIT.class)
-    public static void doSetup() throws Exception {
-        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
-        // Don't split intra region so we can more easily know that the n-way parallelization is for the explain plan
-        // Forces server cache to be used
-        props.put(QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, Integer.toString(2));
-        props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
-        props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true));
-        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
-    }
-    
-    @Test
-    public void testRollbackOfUncommittedIndexChange() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-        try {
-            Statement stmt = conn.createStatement();
-            stmt.execute("CREATE TABLE DEMO(v1 VARCHAR PRIMARY KEY, v2 VARCHAR, v3 VARCHAR)");
-            stmt.execute("CREATE INDEX DEMO_idx ON DEMO (v2) INCLUDE(v3)");
-            
-            stmt.executeUpdate("upsert into DEMO values('x', 'y', 'a')");
-            
-            //assert values in data table
-            ResultSet rs = stmt.executeQuery("select v1, v2, v3 from DEMO");
-            assertTrue(rs.next());
-            assertEquals("x", rs.getString(1));
-            assertEquals("y", rs.getString(2));
-            assertEquals("a", rs.getString(3));
-            assertFalse(rs.next());
-            
-            conn.rollback();
-            
-            //assert values in data table
-            rs = stmt.executeQuery("select v1, v2, v3 from DEMO");
-            assertFalse(rs.next());
-            
-        } finally {
-            conn.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxImmutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxImmutableIndexIT.java
deleted file mode 100644
index e9d685f..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxImmutableIndexIT.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end.index;
-
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.Statement;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
-import org.apache.phoenix.end2end.Shadower;
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.google.common.collect.Maps;
-
-public class TxImmutableIndexIT extends ImmutableIndexIT {
-    
-    @BeforeClass
-    @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
-    public static void doSetup() throws Exception {
-        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
-        // Don't split intra region so we can more easily know that the n-way parallelization is for the explain plan
-        // Forces server cache to be used
-        props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true));
-        // We need this b/c we don't allow a transactional table to be created if the underlying
-        // HBase table already exists (since we don't know if it was transactional before).
-        props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
-        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
-    }
-    
-    // TODO: need test case with mix of mutable and immutable indexes
-    @Test
-    public void testRollbackOfUncommittedKeyValueIndexChange() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-        try {
-            Statement stmt = conn.createStatement();
-            stmt.execute("CREATE TABLE DEMO(v1 VARCHAR PRIMARY KEY, v2 VARCHAR, v3 VARCHAR) IMMUTABLE_ROWS=true");
-            stmt.execute("CREATE INDEX DEMO_idx ON DEMO (v2) INCLUDE(v3)");
-            
-            stmt.executeUpdate("upsert into DEMO values('x', 'y', 'a')");
-            
-            //assert values in data table
-            ResultSet rs = stmt.executeQuery("select v1, v2, v3 from DEMO");
-            assertTrue(rs.next());
-            assertEquals("x", rs.getString(1));
-            assertEquals("y", rs.getString(2));
-            assertEquals("a", rs.getString(3));
-            assertFalse(rs.next());
-            
-            conn.rollback();
-            
-            //assert values in data table
-            rs = stmt.executeQuery("select v1, v2, v3 from DEMO");
-            assertFalse(rs.next());
-            
-        } finally {
-            conn.close();
-        }
-    }
-    
-    // TODO: need test case with mix of mutable and immutable indexes
-    @Test
-    public void testRollbackOfUncommittedRowKeyIndexChange() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.setAutoCommit(false);
-        try {
-            Statement stmt = conn.createStatement();
-            stmt.execute("CREATE TABLE DEMO(v1 VARCHAR, v2 VARCHAR, v3 VARCHAR, CONSTRAINT pk PRIMARY KEY (v1, v2)) IMMUTABLE_ROWS=true");
-            stmt.execute("CREATE INDEX DEMO_idx ON DEMO (v2, v1)");
-            
-            stmt.executeUpdate("upsert into DEMO values('x', 'y', 'a')");
-            
-            //assert values in data table
-            ResultSet rs = stmt.executeQuery("select v1, v2, v3 from DEMO");
-            assertTrue(rs.next());
-            assertEquals("x", rs.getString(1));
-            assertEquals("y", rs.getString(2));
-            assertEquals("a", rs.getString(3));
-            assertFalse(rs.next());
-            
-            conn.rollback();
-            
-            //assert values in data table
-            rs = stmt.executeQuery("select v1, v2, v3 from DEMO");
-            assertFalse(rs.next());
-            
-        } finally {
-            conn.close();
-        }
-    }
-	
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/MutableRollbackIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/MutableRollbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/MutableRollbackIT.java
new file mode 100644
index 0000000..4131c2d
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/MutableRollbackIT.java
@@ -0,0 +1,260 @@
+package org.apache.phoenix.end2end.index.txn;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.end2end.Shadower;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Maps;
+
+@RunWith(Parameterized.class)
+public class MutableRollbackIT extends BaseHBaseManagedTimeIT {
+	
+	private final boolean localIndex;
+
+	public MutableRollbackIT(boolean localIndex) {
+		this.localIndex = localIndex;
+	}
+	
+	@BeforeClass
+    @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(2);
+        props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true));
+        // We need this b/c we don't allow a transactional table to be created if the underlying
+        // HBase table already exists (since we don't know if it was transactional before).
+        props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+	
+	@Parameters(name="localIndex = {0}")
+    public static Collection<Boolean> data() {
+        return Arrays.asList(new Boolean[] {     
+                 false, true  
+           });
+    }
+	
+	@Test
+    public void testRollbackOfUncommittedExistingKeyValueIndexUpdate() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        try {
+            Statement stmt = conn.createStatement();
+            stmt.execute("CREATE TABLE DEMO1(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+            stmt.execute("CREATE TABLE DEMO2(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) IMMUTABLE_ROWS=true");
+            stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX DEMO1_idx ON DEMO1 (v1) INCLUDE(v2)");
+            stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX DEMO2_idx ON DEMO2 (v1) INCLUDE(v2)");
+            
+            stmt.executeUpdate("upsert into DEMO1 values('x', 'y', 'a')");
+            conn.commit();
+            
+            //assert rows exists in DEMO1
+            ResultSet rs = stmt.executeQuery("select k, v1, v2 from DEMO1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert rows exists in DEMO1_idx
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO1 ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert no rows exists in DEMO2
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO2");
+            assertFalse(rs.next());
+            
+            //assert no rows exists in DEMO2_idx
+            rs = stmt.executeQuery("select k, v1 from DEMO2 ORDER BY v1");
+            assertFalse(rs.next());
+            
+            stmt.executeUpdate("upsert into DEMO1 values('x', 'y', 'b')");
+            stmt.executeUpdate("upsert into DEMO2 values('a', 'b', 'c')");
+            
+            //assert new covered column value 
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("b", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert new covered column value 
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO1 ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("b", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert rows exists in DEMO2
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO2");
+            assertTrue(rs.next());
+            assertEquals("a", rs.getString(1));
+            assertEquals("b", rs.getString(2));
+            assertEquals("c", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert rows exists in DEMO2 index table
+            rs = stmt.executeQuery("select k, v1 from DEMO2 ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("a", rs.getString(1));
+            assertEquals("b", rs.getString(2));
+            assertFalse(rs.next());
+            
+            conn.rollback();
+            
+            //assert original row exists in DEMO1
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert original row exists in DEMO1_idx
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO1 ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert no rows exists in DEMO2
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO2");
+            assertFalse(rs.next());
+            
+            //assert no rows exists in DEMO2_idx
+            rs = stmt.executeQuery("select k, v1 from DEMO2 ORDER BY v1");
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+
+	@Test
+    public void testRollbackOfUncommittedExistingRowKeyIndexUpdate() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        try {
+            Statement stmt = conn.createStatement();
+            stmt.execute("CREATE TABLE DEMO1(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+            stmt.execute("CREATE TABLE DEMO2(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) IMMUTABLE_ROWS=true");
+            stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX DEMO1_idx ON DEMO1 (v1, k)");
+            stmt.execute("CREATE "+(localIndex? " LOCAL " : "")+"INDEX DEMO2_idx ON DEMO2 (v1, k)");
+            
+            stmt.executeUpdate("upsert into DEMO1 values('x', 'y', 'a')");
+            conn.commit();
+            
+            //assert rows exists in DEMO1 
+            ResultSet rs = stmt.executeQuery("select k, v1, v2 from DEMO1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert rows exists in DEMO1_idx
+            rs = stmt.executeQuery("select k, v1 from DEMO1 ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertFalse(rs.next());
+            
+            //assert no rows exists in DEMO2
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO2");
+            assertFalse(rs.next());
+            
+            //assert no rows exists in DEMO2_idx
+            rs = stmt.executeQuery("select k, v1 from DEMO2 ORDER BY v1");
+            assertFalse(rs.next());
+            
+            stmt.executeUpdate("upsert into DEMO1 values('x', 'z', 'a')");
+            stmt.executeUpdate("upsert into DEMO2 values('a', 'b', 'c')");
+            
+            //assert new covered row key value exists in DEMO1
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("z", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert new covered row key value exists in DEMO1_idx
+            rs = stmt.executeQuery("select k, v1 from DEMO1 ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("z", rs.getString(2));
+            assertFalse(rs.next());
+            
+            //assert rows exists in DEMO2
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO2");
+            assertTrue(rs.next());
+            assertEquals("a", rs.getString(1));
+            assertEquals("b", rs.getString(2));
+            assertEquals("c", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert rows exists in DEMO2 index table
+            rs = stmt.executeQuery("select k, v1 from DEMO2 ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("a", rs.getString(1));
+            assertEquals("b", rs.getString(2));
+            assertFalse(rs.next());
+            
+            conn.rollback();
+            
+            //assert original row exists in DEMO1
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert original row exists in DEMO1_idx
+            rs = stmt.executeQuery("select k, v1 from DEMO1 ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertFalse(rs.next());
+            
+            //assert no rows exists in DEMO2
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO2");
+            assertFalse(rs.next());
+            
+            //assert no rows exists in DEMO2_idx
+            rs = stmt.executeQuery("select k, v1 from DEMO2 ORDER BY v1");
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/RollbackIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/RollbackIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/RollbackIT.java
new file mode 100644
index 0000000..b7ec3c6
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/RollbackIT.java
@@ -0,0 +1,144 @@
+package org.apache.phoenix.end2end.index.txn;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.end2end.Shadower;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Maps;
+
+@RunWith(Parameterized.class)
+public class RollbackIT extends BaseHBaseManagedTimeIT {
+	
+	private final boolean localIndex;
+	private final boolean mutable;
+
+	public RollbackIT(boolean localIndex, boolean mutable) {
+		this.localIndex = localIndex;
+		this.mutable = mutable;
+	}
+	
+	@BeforeClass
+    @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(2);
+        props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true));
+        // We need this b/c we don't allow a transactional table to be created if the underlying
+        // HBase table already exists (since we don't know if it was transactional before).
+        props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+	
+	@Parameters(name="localIndex = {0} , mutable = {1}")
+    public static Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] {     
+                 { false, false }, { false, true }, { true, false }, { true, true }  
+           });
+    }
+    
+    @Test
+    public void testRollbackOfUncommittedKeyValueIndexInsert() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        try {
+            Statement stmt = conn.createStatement();
+            stmt.execute("CREATE TABLE DEMO(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"+(!mutable? " IMMUTABLE_ROWS=true" : ""));
+            stmt.execute("CREATE "+(localIndex? "LOCAL " : "")+"INDEX DEMO_idx ON DEMO (v1) INCLUDE(v2)");
+            
+            stmt.executeUpdate("upsert into DEMO values('x', 'y', 'a')");
+            
+            //assert values in data table
+            ResultSet rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY k");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert values in index table
+            rs = stmt.executeQuery("select k, v1, v2  from DEMO ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            conn.rollback();
+            
+            //assert values in data table
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY k");
+            assertFalse(rs.next());
+            
+            //assert values in index table
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY v1");
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testRollbackOfUncommittedRowKeyIndexInsert() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        try {
+            Statement stmt = conn.createStatement();
+            stmt.execute("CREATE TABLE DEMO(k VARCHAR, v1 VARCHAR, v2 VARCHAR, CONSTRAINT pk PRIMARY KEY (v1, v2))"+(!mutable? " IMMUTABLE_ROWS=true" : ""));
+            stmt.execute("CREATE "+(localIndex? "LOCAL " : "")+"INDEX DEMO_idx ON DEMO (v1, k)");
+            
+            stmt.executeUpdate("upsert into DEMO values('x', 'y', 'a')");
+
+            ResultSet rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY v1");
+            
+            //assert values in data table
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert values in index table
+            rs = stmt.executeQuery("select k, v1 from DEMO ORDER BY v2");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertFalse(rs.next());
+            
+            conn.rollback();
+            
+            //assert values in data table
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO");
+            assertFalse(rs.next());
+            
+            //assert values in index table
+            rs = stmt.executeQuery("select k, v1 from DEMO ORDER BY v2");
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+    
+}
+

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java
new file mode 100644
index 0000000..205056b
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/txn/TxWriteFailureIT.java
@@ -0,0 +1,198 @@
+package org.apache.phoenix.end2end.index.txn;
+
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
+import static org.apache.phoenix.util.TestUtil.LOCALHOST;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Maps;
+
+@RunWith(Parameterized.class)
+public class TxWriteFailureIT extends BaseTest {
+	
+    private static PhoenixTestDriver driver;
+    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+    private static final String SCHEMA_NAME = "S";
+    private static final String DATA_TABLE_NAME = "T";
+    private static final String INDEX_TABLE_NAME = "I";
+    private static final String DATA_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, DATA_TABLE_NAME);
+    private static final String INDEX_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, INDEX_TABLE_NAME);
+    private static final String ROW_TO_FAIL = "fail";
+    
+    private final boolean localIndex;
+	private final boolean mutable;
+
+	public TxWriteFailureIT(boolean localIndex, boolean mutable) {
+		this.localIndex = localIndex;
+		this.mutable = mutable;
+	}
+
+	@BeforeClass
+	public static void setupCluster() throws Exception {
+		Configuration conf = TEST_UTIL.getConfiguration();
+		setUpConfigForMiniCluster(conf);
+		conf.setClass("hbase.coprocessor.region.classes", FailingRegionObserver.class, RegionObserver.class);
+		conf.setBoolean("hbase.coprocessor.abortonerror", false);
+		conf.setBoolean(Indexer.CHECK_VERSION_CONF_KEY, false);
+		TEST_UTIL.startMiniCluster();
+		String clientPort = TEST_UTIL.getConfiguration().get(
+				QueryServices.ZOOKEEPER_PORT_ATTRIB);
+		url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST
+				+ JDBC_PROTOCOL_SEPARATOR + clientPort
+				+ JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
+
+		Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+		// Must update config before starting server
+		props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
+		props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true));
+		driver = initAndRegisterDriver(url, new ReadOnlyProps(props.entrySet().iterator()));
+		clusterInitialized = true;
+		setupTxManager();
+	}
+	
+	@Parameters(name="localIndex = {0} , mutable = {1}")
+    public static Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] {
+                 { false, false }, { false, true }, { true, false }, { true, true }
+           });
+    }
+	
+	@Test
+    public void testIndexTableWriteFailure() throws Exception {
+        helpTestWriteFailure(true);
+	}
+	
+	@Test
+    public void testDataTableWriteFailure() throws Exception {
+        helpTestWriteFailure(false);
+	}
+
+	private void helpTestWriteFailure(boolean indexTableWriteFailure) throws SQLException {
+		ResultSet rs;
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = driver.connect(url, props);
+        conn.setAutoCommit(false);
+        conn.createStatement().execute(
+                "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR PRIMARY KEY, v1 VARCHAR)"+(!mutable? " IMMUTABLE_ROWS=true" : ""));
+        conn.createStatement().execute(
+                "CREATE "+(localIndex? "LOCAL " : "")+"INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1)");
+        
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?)");
+        // to create a data table write failure set k as the ROW_TO_FAIL, to create an index table write failure set v1 as the ROW_TO_FAIL, 
+        // FailingRegionObserver will throw an exception if the put contains ROW_TO_FAIL
+        stmt.setString(1, !indexTableWriteFailure ? ROW_TO_FAIL : "k1");
+        stmt.setString(2, indexTableWriteFailure ? ROW_TO_FAIL : "k2");
+        stmt.execute();
+        stmt.setString(1, "k2");
+        stmt.setString(2, "v2");
+        stmt.execute();
+        try {
+        	conn.commit();
+        	fail();
+        }
+        catch (Exception e) {
+        	conn.rollback();
+        }
+        stmt.setString(1, "k3");
+        stmt.setString(2, "v3");
+        stmt.execute();
+        //this should pass
+        conn.commit();
+        
+        // verify that only k3,v3 exists in the data table
+        String dataSql = "SELECT k, v1 FROM " + DATA_TABLE_FULL_NAME + " order by k";
+        rs = conn.createStatement().executeQuery("EXPLAIN "+dataSql);
+        assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER S.T",
+                QueryUtil.getExplainPlan(rs));
+        rs = conn.createStatement().executeQuery(dataSql);
+        assertTrue(rs.next());
+        assertEquals("k3", rs.getString(1));
+        assertEquals("v3", rs.getString(2));
+        assertFalse(rs.next());
+
+        // verify the only k3,v3  exists in the index table
+        String indexSql = "SELECT k, v1 FROM " + DATA_TABLE_FULL_NAME + " order by v1";
+        rs = conn.createStatement().executeQuery("EXPLAIN "+indexSql);
+        if(localIndex) {
+            assertEquals(
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER _LOCAL_IDX_" + DATA_TABLE_FULL_NAME + " [-32768]\n" + 
+                "    SERVER FILTER BY FIRST KEY ONLY\n" +
+                "CLIENT MERGE SORT",
+                QueryUtil.getExplainPlan(rs));
+        } else {
+	        assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + INDEX_TABLE_FULL_NAME + "\n    SERVER FILTER BY FIRST KEY ONLY",
+	                QueryUtil.getExplainPlan(rs));
+        }
+        rs = conn.createStatement().executeQuery(indexSql);
+        assertTrue(rs.next());
+        assertEquals("k3", rs.getString(1));
+        assertEquals("v3", rs.getString(2));
+        assertFalse(rs.next());
+        
+        conn.createStatement().execute("DROP TABLE " + DATA_TABLE_FULL_NAME);
+	}
+	
+	
+	public static class FailingRegionObserver extends SimpleRegionObserver {
+        @Override
+        public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
+                final Durability durability) throws HBaseIOException {
+            if (shouldFailUpsert(c, put)) {
+                // throwing anything other than instances of IOException result
+                // in this coprocessor being unloaded
+                // DoNotRetryIOException tells HBase not to retry this mutation
+                // multiple times
+                throw new DoNotRetryIOException();
+            }
+        }
+        
+        private boolean shouldFailUpsert(ObserverContext<RegionCoprocessorEnvironment> c, Put put) {
+            return Bytes.contains(put.getRow(), Bytes.toBytes(ROW_TO_FAIL));
+        }
+        
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
index 38e8ae7..52b5b5f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
@@ -10,6 +10,7 @@
 package org.apache.phoenix.tx;
 
 import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.apache.phoenix.util.TestUtil.TRANSACTIONAL_DATA_TABLE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -21,15 +22,18 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
 import org.apache.phoenix.end2end.Shadower;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTableKey;
-import org.apache.phoenix.util.DateUtil;
+import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.Before;
@@ -46,13 +50,6 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
     public void setUp() throws SQLException {
         ensureTableCreated(getUrl(), TRANSACTIONAL_DATA_TABLE);
     }
-
-	@BeforeClass
-    @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
-    public static void doSetup() throws Exception {
-        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
-        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
-    }
 		
 	@Test
 	public void testReadOwnWrites() throws Exception {
@@ -248,4 +245,5 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
 		conn.createStatement().execute("ALTER TABLE " + FULL_TABLE_NAME + " SET IMMUTABLE_ROWS=true");
 		testRowConflicts();
 	}
+	
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d81e660e/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
index 9fa69de..63d4851 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
@@ -26,21 +26,59 @@ import static org.junit.Assert.assertTrue;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.end2end.Shadower;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Maps;
 
 import co.cask.tephra.Transaction.VisibilityLevel;
 
+@RunWith(Parameterized.class)
 public class TxCheckpointIT extends BaseHBaseManagedTimeIT {
+	
+	private final boolean localIndex;
+	private final boolean mutable;
+
+	public TxCheckpointIT(boolean localIndex, boolean mutable) {
+		this.localIndex = localIndex;
+		this.mutable = mutable;
+	}
+	
+	@BeforeClass
+    @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(2);
+        props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true));
+        // We need this b/c we don't allow a transactional table to be created if the underlying
+        // HBase table already exists (since we don't know if it was transactional before).
+        props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+	
+	@Parameters(name="localIndex = {0} , mutable = {1}")
+    public static Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] {     
+                 { false, false }, { false, true }, { true, false }, { true, true }  
+           });
+    }
 
-    
     @Test
     public void testUpsertSelectDoesntSeeUpsertedData() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
@@ -50,7 +88,8 @@ public class TxCheckpointIT extends BaseHBaseManagedTimeIT {
         Connection conn = DriverManager.getConnection(getUrl(), props);
         conn.setAutoCommit(true);
         conn.createStatement().execute("CREATE SEQUENCE keys");
-        conn.createStatement().execute("CREATE TABLE txfoo (pk INTEGER PRIMARY KEY, val INTEGER) TRANSACTIONAL=true");
+        conn.createStatement().execute("CREATE TABLE txfoo (pk INTEGER PRIMARY KEY, val INTEGER)"+(!mutable? " IMMUTABLE_ROWS=true" : ""));
+        conn.createStatement().execute("CREATE "+(localIndex? "LOCAL " : "")+"INDEX idx ON txfoo (val)");
 
         conn.createStatement().execute("UPSERT INTO txfoo VALUES (NEXT VALUE FOR keys,1)");
         for (int i=0; i<6; i++) {
@@ -62,150 +101,239 @@ public class TxCheckpointIT extends BaseHBaseManagedTimeIT {
     }
     
     @Test
-    public void testCheckpointForUpsertSelect() throws Exception {
-        ResultSet rs;
+    public void testRollbackOfUncommittedDelete() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.createStatement().execute("create table tx1 (id bigint not null primary key) TRANSACTIONAL=true");
-        conn.createStatement().execute("create table tx2 (id bigint not null primary key) TRANSACTIONAL=true");
-
-        conn.createStatement().execute("upsert into tx1 values (1)");
-        conn.createStatement().execute("upsert into tx1 values (2)");
-        conn.createStatement().execute("upsert into tx1 values (3)");
-        conn.commit();
-
-        MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState();
-        state.startTransaction();
-        long wp = state.getWritePointer();
-        conn.createStatement().execute("upsert into tx1 select max(id)+1 from tx1");
-        assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel());
-        assertEquals(wp, state.getWritePointer()); // Make sure write ptr didn't move
-        rs = conn.createStatement().executeQuery("select max(id) from tx1");
-        
-        assertTrue(rs.next());
-        assertEquals(4,rs.getLong(1));
-        assertFalse(rs.next());
-        
-        conn.createStatement().execute("upsert into tx1 select max(id)+1 from tx1");
-        assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel());
-        assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr moves
-        wp = state.getWritePointer();
-        
-        conn.createStatement().execute("upsert into tx1 select id from tx2");
-        assertEquals(VisibilityLevel.SNAPSHOT, state.getVisibilityLevel());
-        // Write ptr shouldn't move b/c we're not reading from a table with uncommitted data
-        assertEquals(wp, state.getWritePointer()); 
-        
-        rs = conn.createStatement().executeQuery("select max(id) from tx1");
-        
-        assertTrue(rs.next());
-        assertEquals(5,rs.getLong(1));
-        assertFalse(rs.next());
-        
-        conn.rollback();
-        
-        rs = conn.createStatement().executeQuery("select max(id) from tx1");
-        
-        assertTrue(rs.next());
-        assertEquals(3,rs.getLong(1));
-        assertFalse(rs.next());
-
-        wp = state.getWritePointer();
-        conn.createStatement().execute("upsert into tx1 select max(id)+1 from tx1");
-        assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel());
-        assertEquals(wp, state.getWritePointer()); // Make sure write ptr didn't move
-        rs = conn.createStatement().executeQuery("select max(id) from tx1");
-        
-        assertTrue(rs.next());
-        assertEquals(4,rs.getLong(1));
-        assertFalse(rs.next());
-        
-        conn.createStatement().execute("upsert into tx1 select max(id)+1 from tx1");
-        assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel());
-        assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr moves
-        rs = conn.createStatement().executeQuery("select max(id) from tx1");
-        
-        assertTrue(rs.next());
-        assertEquals(5,rs.getLong(1));
-        assertFalse(rs.next());
-        
-        conn.commit();
-        
-        rs = conn.createStatement().executeQuery("select max(id) from tx1");
-        
-        assertTrue(rs.next());
-        assertEquals(5,rs.getLong(1));
-        assertFalse(rs.next());
-    }  
+        conn.setAutoCommit(false);
+        try {
+            Statement stmt = conn.createStatement();
+            stmt.execute("CREATE TABLE DEMO(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"+(!mutable? " IMMUTABLE_ROWS=true" : ""));
+            stmt.execute("CREATE "+(localIndex? "LOCAL " : "")+"INDEX DEMO_idx ON DEMO (v1) INCLUDE(v2)");
+            
+            stmt.executeUpdate("upsert into DEMO values('x1', 'y1', 'a1')");
+            stmt.executeUpdate("upsert into DEMO values('x2', 'y2', 'a2')");
+            
+            //assert values in data table
+            ResultSet rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY k");
+            assertTrue(rs.next());
+            assertEquals("x1", rs.getString(1));
+            assertEquals("y1", rs.getString(2));
+            assertEquals("a1", rs.getString(3));
+            assertTrue(rs.next());
+            assertEquals("x2", rs.getString(1));
+            assertEquals("y2", rs.getString(2));
+            assertEquals("a2", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert values in index table
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x1", rs.getString(1));
+            assertEquals("y1", rs.getString(2));
+            assertEquals("a1", rs.getString(3));
+            assertTrue(rs.next());
+            assertEquals("x2", rs.getString(1));
+            assertEquals("y2", rs.getString(2));
+            assertEquals("a2", rs.getString(3));
+            assertFalse(rs.next());
+            
+            conn.commit();
+            
+            stmt.executeUpdate("DELETE FROM DEMO WHERE k='x1' AND v1='y1' AND v2='a1'");
+            //assert row is delete in data table
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY k");
+            assertTrue(rs.next());
+            assertEquals("x2", rs.getString(1));
+            assertEquals("y2", rs.getString(2));
+            assertEquals("a2", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert row is delete in index table
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x2", rs.getString(1));
+            assertEquals("y2", rs.getString(2));
+            assertEquals("a2", rs.getString(3));
+            assertFalse(rs.next());
+            
+            conn.rollback();
+            
+            //assert two rows in data table
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY k");
+            assertTrue(rs.next());
+            assertEquals("x1", rs.getString(1));
+            assertEquals("y1", rs.getString(2));
+            assertEquals("a1", rs.getString(3));
+            assertTrue(rs.next());
+            assertEquals("x2", rs.getString(1));
+            assertEquals("y2", rs.getString(2));
+            assertEquals("a2", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert two rows in index table
+            rs = stmt.executeQuery("select k, v1, v2 from DEMO ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x1", rs.getString(1));
+            assertEquals("y1", rs.getString(2));
+            assertEquals("a1", rs.getString(3));
+            assertTrue(rs.next());
+            assertEquals("x2", rs.getString(1));
+            assertEquals("y2", rs.getString(2));
+            assertEquals("a2", rs.getString(3));
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+    
+	@Test
+	public void testCheckpointForUpsertSelect() throws Exception {
+		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+		try (Connection conn = DriverManager.getConnection(getUrl(), props);) {
+			conn.setAutoCommit(false);
+			Statement stmt = conn.createStatement();
 
-    @Test
+			stmt.execute("CREATE TABLE DEMO(ID BIGINT NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"
+					+ (!mutable ? " IMMUTABLE_ROWS=true" : ""));
+			stmt.execute("CREATE " + (localIndex ? "LOCAL " : "")
+					+ "INDEX IDX ON DEMO (v1) INCLUDE(v2)");
+
+            stmt.executeUpdate("upsert into DEMO values(1, 'a2', 'b1')");
+            stmt.executeUpdate("upsert into DEMO values(2, 'a2', 'b2')");
+            stmt.executeUpdate("upsert into DEMO values(3, 'a3', 'b3')");
+			conn.commit();
+
+			upsertRows(conn);
+			conn.rollback();
+			verifyRows(conn, 3);
+
+			upsertRows(conn);
+			conn.commit();
+			verifyRows(conn, 6);
+		}
+	}
+
+	private void verifyRows(Connection conn, int expectedMaxId) throws SQLException {
+		ResultSet rs;
+		//query the data table
+		rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ max(id) from DEMO");
+		assertTrue(rs.next());
+		assertEquals(expectedMaxId, rs.getLong(1));
+		assertFalse(rs.next());
+		
+		// query the index
+		rs = conn.createStatement().executeQuery("select /*+ INDEX(DEMO IDX) */ max(id) from DEMO");
+		assertTrue(rs.next());
+		assertEquals(expectedMaxId, rs.getLong(1));
+		assertFalse(rs.next());
+	}
+
+	private void upsertRows(Connection conn) throws SQLException {
+		ResultSet rs;
+		MutationState state = conn.unwrap(PhoenixConnection.class)
+				.getMutationState();
+		state.startTransaction();
+		long wp = state.getWritePointer();
+		conn.createStatement().execute(
+				"upsert into DEMO select max(id)+1, 'a4', 'b4' from DEMO");
+		assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT,
+				state.getVisibilityLevel());
+		assertEquals(wp, state.getWritePointer()); // Make sure write ptr
+													// didn't move
+		rs = conn.createStatement().executeQuery("select max(id) from DEMO");
+
+		assertTrue(rs.next());
+		assertEquals(4, rs.getLong(1));
+		assertFalse(rs.next());
+
+		conn.createStatement().execute(
+				"upsert into DEMO select max(id)+1, 'a5', 'b5' from DEMO");
+		assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT,
+				state.getVisibilityLevel());
+		assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr
+														// moves
+		wp = state.getWritePointer();
+		rs = conn.createStatement().executeQuery("select max(id) from DEMO");
+
+		assertTrue(rs.next());
+		assertEquals(5, rs.getLong(1));
+		assertFalse(rs.next());
+		
+		conn.createStatement().execute(
+				"upsert into DEMO select max(id)+1, 'a6', 'b6' from DEMO");
+		assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT,
+				state.getVisibilityLevel());
+		assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr
+														// moves
+		wp = state.getWritePointer();
+		rs = conn.createStatement().executeQuery("select max(id) from DEMO");
+
+		assertTrue(rs.next());
+		assertEquals(6, rs.getLong(1));
+		assertFalse(rs.next());
+	}
+	
+	@Test
     public void testCheckpointForDelete() throws Exception {
-        ResultSet rs;
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(getUrl(), props);
-        conn.createStatement().execute("create table tx3 (id1 bigint primary key, fk1 integer) TRANSACTIONAL=true");
-        conn.createStatement().execute("create table tx4 (id2 bigint primary key, fk2 integer) TRANSACTIONAL=true");
-
-        conn.createStatement().execute("upsert into tx3 values (1, 3)");
-        conn.createStatement().execute("upsert into tx3 values (2, 2)");
-        conn.createStatement().execute("upsert into tx3 values (3, 1)");
-        conn.createStatement().execute("upsert into tx4 values (1, 1)");
-        conn.commit();
-
-        MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState();
-        state.startTransaction();
-        long wp = state.getWritePointer();
-        conn.createStatement().execute("delete from tx3 where id1=fk1");
-        assertEquals(VisibilityLevel.SNAPSHOT, state.getVisibilityLevel());
-        assertEquals(wp, state.getWritePointer()); // Make sure write ptr didn't move
-
-        rs = conn.createStatement().executeQuery("select id1 from tx3");
-        assertTrue(rs.next());
-        assertEquals(1,rs.getLong(1));
-        assertTrue(rs.next());
-        assertEquals(3,rs.getLong(1));
-        assertFalse(rs.next());
-
-        conn.createStatement().execute("delete from tx3 where id1 in (select fk1 from tx3 join tx4 on (fk2=id1))");
-        assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel());
-        assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr moved
-
-        rs = conn.createStatement().executeQuery("select id1 from tx3");
-        assertTrue(rs.next());
-        assertEquals(1,rs.getLong(1));
-        assertFalse(rs.next());
-
-        /*
-         * TODO: file Tephra JIRA, as this fails with an NPE because the
-         * ActionChange has a null family since we're issuing row deletes.
-         * See this code in TransactionAwareHTable.transactionalizeAction(Delete)
-         * and try modifying addToChangeSet(deleteRow, null, null);
-         * to modifying addToChangeSet(deleteRow, family, null);
-            } else {
-              for (Map.Entry<byte [], List<Cell>> familyEntry : familyToDelete.entrySet()) {
-                byte[] family = familyEntry.getKey();
-                List<Cell> entries = familyEntry.getValue();
-                boolean isFamilyDelete = false;
-                if (entries.size() == 1) {
-                  Cell cell = entries.get(0);
-                  isFamilyDelete = CellUtil.isDeleteFamily(cell);
-                }
-                if (isFamilyDelete) {
-                  if (conflictLevel == TxConstants.ConflictDetection.ROW ||
-                      conflictLevel == TxConstants.ConflictDetection.NONE) {
-                    // no need to identify individual columns deleted
-                    txDelete.deleteFamily(family);
-                    addToChangeSet(deleteRow, null, null);
-         */
-//        conn.rollback();
-//        rs = conn.createStatement().executeQuery("select id1 from tx3");
-//        assertTrue(rs.next());
-//        assertEquals(1,rs.getLong(1));
-//        assertTrue(rs.next());
-//        assertEquals(2,rs.getLong(1));
-//        assertTrue(rs.next());
-//        assertEquals(3,rs.getLong(1));
-//        assertFalse(rs.next());
+		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+		ResultSet rs;
+		try (Connection conn = DriverManager.getConnection(getUrl(), props);) {
+			conn.setAutoCommit(false);
+			Statement stmt = conn.createStatement();
+			stmt.execute("CREATE TABLE DEMO1(ID1 BIGINT NOT NULL PRIMARY KEY, FK1A INTEGER, FK1B INTEGER)"
+					+ (!mutable ? " IMMUTABLE_ROWS=true" : ""));
+			stmt.execute("CREATE TABLE DEMO2(ID2 BIGINT NOT NULL PRIMARY KEY, FK2 INTEGER)"
+					+ (!mutable ? " IMMUTABLE_ROWS=true" : ""));
+			stmt.execute("CREATE " + (localIndex ? "LOCAL " : "")
+					+ "INDEX IDX ON DEMO1 (FK1B)");
+			
+			stmt.executeUpdate("upsert into DEMO1 values (1, 3, 3)");
+			stmt.executeUpdate("upsert into DEMO1 values (2, 2, 2)");
+			stmt.executeUpdate("upsert into DEMO1 values (3, 1, 1)");
+			stmt.executeUpdate("upsert into DEMO2 values (1, 1)");
+			conn.commit();
 
+	        MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState();
+	        state.startTransaction();
+	        long wp = state.getWritePointer();
+	        conn.createStatement().execute("delete from DEMO1 where id1=fk1b AND fk1b=id1");
+	        assertEquals(VisibilityLevel.SNAPSHOT, state.getVisibilityLevel());
+	        assertEquals(wp, state.getWritePointer()); // Make sure write ptr didn't move
+	
+	        rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ id1 from DEMO1");
+	        assertTrue(rs.next());
+	        assertEquals(1,rs.getLong(1));
+	        assertTrue(rs.next());
+	        assertEquals(3,rs.getLong(1));
+	        assertFalse(rs.next());
+	        
+	        rs = conn.createStatement().executeQuery("select /*+ INDEX(DEMO IDX) */ id1 from DEMO1");
+	        assertTrue(rs.next());
+	        assertEquals(3,rs.getLong(1));
+	        assertTrue(rs.next());
+	        assertEquals(1,rs.getLong(1));
+	        assertFalse(rs.next());
+	
+	        conn.createStatement().execute("delete from DEMO1 where id1 in (select fk1a from DEMO1 join DEMO2 on (fk2=id1))");
+	        assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel());
+	        assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr moved
+	
+	        rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ id1 from DEMO1");
+	        assertTrue(rs.next());
+	        assertEquals(1,rs.getLong(1));
+	        assertFalse(rs.next());
+	
+	        conn.rollback();
+	        rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ id1 from DEMO1");
+	        assertTrue(rs.next());
+	        assertEquals(1,rs.getLong(1));
+	        assertTrue(rs.next());
+	        assertEquals(2,rs.getLong(1));
+	        assertTrue(rs.next());
+	        assertEquals(3,rs.getLong(1));
+	        assertFalse(rs.next());
+		}
     }  
+
+    
 }