You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/12/11 03:43:45 UTC

[07/52] [abbrv] phoenix git commit: PHOENIX-1674 Snapshot isolation transaction support through Tephra (James Taylor, Thomas D'Silva)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
new file mode 100644
index 0000000..6bcf7ea
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.tx;
+
+import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.TRANSACTIONAL_DATA_TABLE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import co.cask.tephra.TxConstants;
+import co.cask.tephra.hbase11.coprocessor.TransactionProcessor;
+
+import com.google.common.collect.Lists;
+
+public class TransactionIT extends BaseHBaseManagedTimeIT {
+	
+	private static final String FULL_TABLE_NAME = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + TRANSACTIONAL_DATA_TABLE;
+	
+    @Before
+    public void setUp() throws SQLException {
+        ensureTableCreated(getUrl(), TRANSACTIONAL_DATA_TABLE);
+    }
+		
+	@Test
+	public void testReadOwnWrites() throws Exception {
+		String selectSql = "SELECT * FROM "+FULL_TABLE_NAME;
+		try (Connection conn = DriverManager.getConnection(getUrl())) {
+			conn.setAutoCommit(false);
+			ResultSet rs = conn.createStatement().executeQuery(selectSql);
+	     	assertFalse(rs.next());
+	     	
+	        String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
+	        PreparedStatement stmt = conn.prepareStatement(upsert);
+			// upsert two rows
+			TestUtil.setRowKeyColumns(stmt, 1);
+			stmt.execute();
+			TestUtil.setRowKeyColumns(stmt, 2);
+			stmt.execute();
+	        
+	        // verify rows can be read even though commit has not been called
+			rs = conn.createStatement().executeQuery(selectSql);
+			TestUtil.validateRowKeyColumns(rs, 1);
+			TestUtil.validateRowKeyColumns(rs, 2);
+	        assertFalse(rs.next());
+	        
+	        conn.commit();
+	        
+	        // verify rows can be read after commit
+	        rs = conn.createStatement().executeQuery(selectSql);
+	        TestUtil.validateRowKeyColumns(rs, 1);
+	        TestUtil.validateRowKeyColumns(rs, 2);
+	        assertFalse(rs.next());
+		}
+	}
+	
+	@Test
+	public void testTxnClosedCorrecty() throws Exception {
+		String selectSql = "SELECT * FROM "+FULL_TABLE_NAME;
+		try (Connection conn = DriverManager.getConnection(getUrl())) {
+			conn.setAutoCommit(false);
+			ResultSet rs = conn.createStatement().executeQuery(selectSql);
+	     	assertFalse(rs.next());
+	     	
+	        String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
+	        PreparedStatement stmt = conn.prepareStatement(upsert);
+			// upsert two rows
+			TestUtil.setRowKeyColumns(stmt, 1);
+			stmt.execute();
+			TestUtil.setRowKeyColumns(stmt, 2);
+			stmt.execute();
+	        
+	        // verify rows can be read even though commit has not been called
+			rs = conn.createStatement().executeQuery(selectSql);
+			TestUtil.validateRowKeyColumns(rs, 1);
+			TestUtil.validateRowKeyColumns(rs, 2);
+	        assertFalse(rs.next());
+	        
+	        conn.close();
+	        // wait for any open txns to time out
+	        Thread.sleep(DEFAULT_TXN_TIMEOUT_SECONDS*1000+10000);
+	        assertTrue("There should be no invalid transactions", txManager.getInvalidSize()==0);
+		}
+	}
+	
+    @Test
+    public void testDelete() throws Exception {
+        String selectSQL = "SELECT * FROM " + FULL_TABLE_NAME;
+        try (Connection conn1 = DriverManager.getConnection(getUrl()); 
+        		Connection conn2 = DriverManager.getConnection(getUrl())) {
+            conn1.setAutoCommit(false);
+            ResultSet rs = conn1.createStatement().executeQuery(selectSQL);
+            assertFalse(rs.next());
+            
+            String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
+            PreparedStatement stmt = conn1.prepareStatement(upsert);
+            // upsert two rows
+            TestUtil.setRowKeyColumns(stmt, 1);
+            stmt.execute();
+            conn1.commit();
+            
+            TestUtil.setRowKeyColumns(stmt, 2);
+            stmt.execute();
+            
+            // verify rows can be read even though commit has not been called
+            int rowsDeleted = conn1.createStatement().executeUpdate("DELETE FROM " + FULL_TABLE_NAME);
+            assertEquals(2, rowsDeleted);
+            
+            // Delete and second upsert not committed yet, so there should be one row.
+            rs = conn2.createStatement().executeQuery("SELECT count(*) FROM " + FULL_TABLE_NAME);
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(1));
+            
+            conn1.commit();
+            
+            // verify rows are deleted after commit
+            rs = conn1.createStatement().executeQuery(selectSQL);
+            assertFalse(rs.next());
+        }
+    }
+    
+	@Test
+	public void testAutoCommitQuerySingleTable() throws Exception {
+		try (Connection conn = DriverManager.getConnection(getUrl())) {
+			conn.setAutoCommit(true);
+			// verify no rows returned
+			ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + FULL_TABLE_NAME);
+			assertFalse(rs.next());
+		}
+	}
+	
+    @Test
+    public void testAutoCommitQueryMultiTables() throws Exception {
+    	try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(true);
+            // verify no rows returned
+            ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + FULL_TABLE_NAME + " a JOIN " + FULL_TABLE_NAME + " b ON (a.long_pk = b.int_pk)");
+            assertFalse(rs.next());
+        } 
+    }
+    
+	@Test
+	public void testColConflicts() throws Exception {
+		try (Connection conn1 = DriverManager.getConnection(getUrl()); 
+        		Connection conn2 = DriverManager.getConnection(getUrl())) {
+			conn1.setAutoCommit(false);
+			conn2.setAutoCommit(false);
+			String selectSql = "SELECT * FROM "+FULL_TABLE_NAME;
+			conn1.setAutoCommit(false);
+			ResultSet rs = conn1.createStatement().executeQuery(selectSql);
+	     	assertFalse(rs.next());
+			// upsert row using conn1
+			String upsertSql = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, a.int_col1) VALUES(?, ?, ?, ?, ?, ?, ?)";
+			PreparedStatement stmt = conn1.prepareStatement(upsertSql);
+			TestUtil.setRowKeyColumns(stmt, 1);
+			stmt.setInt(7, 10);
+	        stmt.execute();
+	        // upsert row using conn2
+ 			stmt = conn2.prepareStatement(upsertSql);
+ 			TestUtil.setRowKeyColumns(stmt, 1);
+			stmt.setInt(7, 11);
+	        stmt.execute();
+ 	        
+ 	        conn1.commit();
+	        //second commit should fail
+ 	        try {
+ 	 	        conn2.commit();
+ 	 	        fail();
+ 	        }	
+ 	        catch (SQLException e) {
+ 	        	assertEquals(e.getErrorCode(), SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode());
+ 	        }
+		}
+	}
+	
+	private void testRowConflicts() throws Exception {
+		try (Connection conn1 = DriverManager.getConnection(getUrl()); 
+        		Connection conn2 = DriverManager.getConnection(getUrl())) {
+			conn1.setAutoCommit(false);
+			conn2.setAutoCommit(false);
+			String selectSql = "SELECT * FROM "+FULL_TABLE_NAME;
+			conn1.setAutoCommit(false);
+			ResultSet rs = conn1.createStatement().executeQuery(selectSql);
+			boolean immutableRows = conn1.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, FULL_TABLE_NAME)).isImmutableRows();
+	     	assertFalse(rs.next());
+			// upsert row using conn1
+			String upsertSql = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, a.int_col1) VALUES(?, ?, ?, ?, ?, ?, ?)";
+			PreparedStatement stmt = conn1.prepareStatement(upsertSql);
+			TestUtil.setRowKeyColumns(stmt, 1);
+			stmt.setInt(7, 10);
+	        stmt.execute();
+	        // upsert row using conn2
+	        upsertSql = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, b.int_col2) VALUES(?, ?, ?, ?, ?, ?, ?)";
+ 			stmt = conn2.prepareStatement(upsertSql);
+ 			TestUtil.setRowKeyColumns(stmt, 1);
+			stmt.setInt(7, 11);
+ 	        stmt.execute();
+ 	        
+ 	        conn1.commit();
+	        //second commit should fail
+ 	        try {
+ 	 	        conn2.commit();
+ 	 	        if (!immutableRows) fail();
+ 	        }	
+ 	        catch (SQLException e) {
+ 	        	if (immutableRows) fail();
+ 	        	assertEquals(e.getErrorCode(), SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode());
+ 	        }
+		}
+	}
+	
+	@Test
+	public void testRowConflictDetected() throws Exception {
+		testRowConflicts();
+	}
+	
+	@Test
+	public void testNoConflictDetectionForImmutableRows() throws Exception {
+		Connection conn = DriverManager.getConnection(getUrl());
+		conn.createStatement().execute("ALTER TABLE " + FULL_TABLE_NAME + " SET IMMUTABLE_ROWS=true");
+		testRowConflicts();
+	}
+    
+    @Test
+    public void testNonTxToTxTable() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute("CREATE TABLE NON_TX_TABLE(k INTEGER PRIMARY KEY, v VARCHAR)");
+        conn.createStatement().execute("UPSERT INTO NON_TX_TABLE VALUES (1)");
+        conn.createStatement().execute("UPSERT INTO NON_TX_TABLE VALUES (2, 'a')");
+        conn.createStatement().execute("UPSERT INTO NON_TX_TABLE VALUES (3, 'b')");
+        conn.commit();
+        
+        conn.createStatement().execute("CREATE INDEX IDX ON NON_TX_TABLE(v)");
+        // Reset empty column value to an empty value like it is pre-transactions
+        HTableInterface htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("NON_TX_TABLE"));
+        List<Put>puts = Lists.newArrayList(new Put(PInteger.INSTANCE.toBytes(1)), new Put(PInteger.INSTANCE.toBytes(2)), new Put(PInteger.INSTANCE.toBytes(3)));
+        for (Put put : puts) {
+            put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, ByteUtil.EMPTY_BYTE_ARRAY);
+        }
+        htable.put(puts);
+        
+        conn.createStatement().execute("ALTER TABLE NON_TX_TABLE SET TRANSACTIONAL=true");
+        
+        htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("NON_TX_TABLE"));
+        assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TransactionProcessor.class.getName()));
+        htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("IDX"));
+        assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TransactionProcessor.class.getName()));
+
+        conn.createStatement().execute("UPSERT INTO NON_TX_TABLE VALUES (4, 'c')");
+        ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ k FROM NON_TX_TABLE WHERE v IS NULL");
+        assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, "NON_TX_TABLE")).isTransactional());
+        assertTrue(rs.next());
+        assertEquals(1,rs.getInt(1));
+        assertFalse(rs.next());
+        conn.commit();
+        
+        conn.createStatement().execute("UPSERT INTO NON_TX_TABLE VALUES (5, 'd')");
+        rs = conn.createStatement().executeQuery("SELECT k FROM NON_TX_TABLE");
+        assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, "IDX")).isTransactional());
+        assertTrue(rs.next());
+        assertEquals(1,rs.getInt(1));
+        assertTrue(rs.next());
+        assertEquals(2,rs.getInt(1));
+        assertTrue(rs.next());
+        assertEquals(3,rs.getInt(1));
+        assertTrue(rs.next());
+        assertEquals(4,rs.getInt(1));
+        assertTrue(rs.next());
+        assertEquals(5,rs.getInt(1));
+        assertFalse(rs.next());
+        conn.rollback();
+        
+        rs = conn.createStatement().executeQuery("SELECT k FROM NON_TX_TABLE");
+        assertTrue(rs.next());
+        assertEquals(1,rs.getInt(1));
+        assertTrue(rs.next());
+        assertEquals(2,rs.getInt(1));
+        assertTrue(rs.next());
+        assertEquals(3,rs.getInt(1));
+        assertTrue(rs.next());
+        assertEquals(4,rs.getInt(1));
+        assertFalse(rs.next());
+    }
+    
+    @Test
+    public void testNonTxToTxTableFailure() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        // Put table in SYSTEM schema to prevent attempts to update the cache after we disable SYSTEM.CATALOG
+        conn.createStatement().execute("CREATE TABLE SYSTEM.NON_TX_TABLE(k INTEGER PRIMARY KEY, v VARCHAR)");
+        conn.createStatement().execute("UPSERT INTO SYSTEM.NON_TX_TABLE VALUES (1)");
+        conn.commit();
+        // Reset empty column value to an empty value like it is pre-transactions
+        HTableInterface htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("SYSTEM.NON_TX_TABLE"));
+        Put put = new Put(PInteger.INSTANCE.toBytes(1));
+        put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, ByteUtil.EMPTY_BYTE_ARRAY);
+        htable.put(put);
+        
+        HBaseAdmin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();
+        admin.disableTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME);
+        try {
+            // This will succeed initially in updating the HBase metadata, but then will fail when
+            // the SYSTEM.CATALOG table is attempted to be updated, exercising the code to restore
+            // the coprocessors back to the non transactional ones.
+            conn.createStatement().execute("ALTER TABLE SYSTEM.NON_TX_TABLE SET TRANSACTIONAL=true");
+            fail();
+        } catch (SQLException e) {
+            assertTrue(e.getMessage().contains(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " is disabled"));
+        } finally {
+            admin.enableTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME);
+            admin.close();
+        }
+        
+        ResultSet rs = conn.createStatement().executeQuery("SELECT k FROM SYSTEM.NON_TX_TABLE WHERE v IS NULL");
+        assertTrue(rs.next());
+        assertEquals(1,rs.getInt(1));
+        assertFalse(rs.next());
+        
+        htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("SYSTEM.NON_TX_TABLE"));
+        assertFalse(htable.getTableDescriptor().getCoprocessors().contains(TransactionProcessor.class.getName()));
+        assertEquals(1,conn.unwrap(PhoenixConnection.class).getQueryServices().
+                getTableDescriptor(Bytes.toBytes("SYSTEM.NON_TX_TABLE")).
+                getFamily(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES).getMaxVersions());
+    }
+    
+    @Test
+    public void testProperties() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute("CREATE TABLE NON_TX_TABLE1(k INTEGER PRIMARY KEY, a.v VARCHAR, b.v VARCHAR, c.v VARCHAR) TTL=1000");
+        conn.createStatement().execute("CREATE INDEX idx1 ON NON_TX_TABLE1(a.v, b.v) TTL=1000");
+        conn.createStatement().execute("CREATE INDEX idx2 ON NON_TX_TABLE1(c.v) INCLUDE (a.v, b.v) TTL=1000");
+
+        conn.createStatement().execute("ALTER TABLE NON_TX_TABLE1 SET TRANSACTIONAL=true");
+
+        HTableDescriptor desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes("NON_TX_TABLE1"));
+        for (HColumnDescriptor colDesc : desc.getFamilies()) {
+            assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, colDesc.getMaxVersions());
+            assertEquals(1000, colDesc.getTimeToLive());
+            assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL)));
+        }
+
+        desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes("IDX1"));
+        for (HColumnDescriptor colDesc : desc.getFamilies()) {
+            assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, colDesc.getMaxVersions());
+            assertEquals(1000, colDesc.getTimeToLive());
+            assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL)));
+        }
+        
+        desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes("IDX2"));
+        for (HColumnDescriptor colDesc : desc.getFamilies()) {
+            assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, colDesc.getMaxVersions());
+            assertEquals(1000, colDesc.getTimeToLive());
+            assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL)));
+        }
+        
+        conn.createStatement().execute("CREATE TABLE NON_TX_TABLE2(k INTEGER PRIMARY KEY, a.v VARCHAR, b.v VARCHAR, c.v VARCHAR)");
+        conn.createStatement().execute("ALTER TABLE NON_TX_TABLE2 SET TRANSACTIONAL=true, VERSIONS=10");
+        desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes("NON_TX_TABLE2"));
+        for (HColumnDescriptor colDesc : desc.getFamilies()) {
+            assertEquals(10, colDesc.getMaxVersions());
+            assertEquals(HColumnDescriptor.DEFAULT_TTL, colDesc.getTimeToLive());
+            assertEquals(null, colDesc.getValue(TxConstants.PROPERTY_TTL));
+        }
+        conn.createStatement().execute("ALTER TABLE NON_TX_TABLE2 SET TTL=1000");
+        desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes("NON_TX_TABLE2"));
+        for (HColumnDescriptor colDesc : desc.getFamilies()) {
+            assertEquals(10, colDesc.getMaxVersions());
+            assertEquals(1000, colDesc.getTimeToLive());
+            assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL)));
+        }
+
+        conn.createStatement().execute("CREATE TABLE NON_TX_TABLE3(k INTEGER PRIMARY KEY, a.v VARCHAR, b.v VARCHAR, c.v VARCHAR)");
+        conn.createStatement().execute("ALTER TABLE NON_TX_TABLE3 SET TRANSACTIONAL=true, b.VERSIONS=10, c.VERSIONS=20");
+        desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes("NON_TX_TABLE3"));
+        assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, desc.getFamily(Bytes.toBytes("A")).getMaxVersions());
+        assertEquals(10, desc.getFamily(Bytes.toBytes("B")).getMaxVersions());
+        assertEquals(20, desc.getFamily(Bytes.toBytes("C")).getMaxVersions());
+
+        conn.createStatement().execute("CREATE TABLE NON_TX_TABLE4(k INTEGER PRIMARY KEY, a.v VARCHAR, b.v VARCHAR, c.v VARCHAR)");
+        try {
+            conn.createStatement().execute("ALTER TABLE NON_TX_TABLE4 SET TRANSACTIONAL=true, VERSIONS=1");
+            fail();
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.TX_MAX_VERSIONS_MUST_BE_GREATER_THAN_ONE.getErrorCode(), e.getErrorCode());
+        }
+
+        try {
+            conn.createStatement().execute("ALTER TABLE NON_TX_TABLE4 SET TRANSACTIONAL=true, b.VERSIONS=1");
+            fail();
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.TX_MAX_VERSIONS_MUST_BE_GREATER_THAN_ONE.getErrorCode(), e.getErrorCode());
+        }
+        
+        conn.createStatement().execute("CREATE TABLE TX_TABLE1(k INTEGER PRIMARY KEY, v VARCHAR) TTL=1000, TRANSACTIONAL=true");
+        desc = conn.unwrap(PhoenixConnection.class).getQueryServices().getTableDescriptor(Bytes.toBytes("TX_TABLE1"));
+        for (HColumnDescriptor colDesc : desc.getFamilies()) {
+            assertEquals(QueryServicesOptions.DEFAULT_MAX_VERSIONS_TRANSACTIONAL, colDesc.getMaxVersions());
+            assertEquals(HColumnDescriptor.DEFAULT_TTL, colDesc.getTimeToLive());
+            assertEquals(1000, Integer.parseInt(colDesc.getValue(TxConstants.PROPERTY_TTL)));
+        }
+    }
+    
+    @Test
+    public void testCreateTableToBeTransactional() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String ddl = "CREATE TABLE TEST_TRANSACTIONAL_TABLE (k varchar primary key) transactional=true";
+        conn.createStatement().execute(ddl);
+        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+        PTable table = pconn.getTable(new PTableKey(null, "TEST_TRANSACTIONAL_TABLE"));
+        HTableInterface htable = pconn.getQueryServices().getTable(Bytes.toBytes("TEST_TRANSACTIONAL_TABLE"));
+        assertTrue(table.isTransactional());
+        assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TransactionProcessor.class.getName()));
+        
+        try {
+            ddl = "ALTER TABLE TEST_TRANSACTIONAL_TABLE SET transactional=false";
+            conn.createStatement().execute(ddl);
+            fail();
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX.getErrorCode(), e.getErrorCode());
+        }
+
+        HBaseAdmin admin = pconn.getQueryServices().getAdmin();
+        HTableDescriptor desc = new HTableDescriptor(TableName.valueOf("TXN_TEST_EXISTING"));
+        desc.addFamily(new HColumnDescriptor(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES));
+        admin.createTable(desc);
+        ddl = "CREATE TABLE TXN_TEST_EXISTING (k varchar primary key) transactional=true";
+        conn.createStatement().execute(ddl);
+        assertEquals(Boolean.TRUE.toString(), admin.getTableDescriptor(TableName.valueOf("TXN_TEST_EXISTING")).getValue(TxConstants.READ_NON_TX_DATA));
+        
+        // Should be ok, as HBase metadata should match existing metadata.
+        ddl = "CREATE TABLE IF NOT EXISTS TEST_TRANSACTIONAL_TABLE (k varchar primary key)"; 
+        try {
+            conn.createStatement().execute(ddl);
+            fail();
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX.getErrorCode(), e.getErrorCode());
+        }
+        ddl += " transactional=true";
+        conn.createStatement().execute(ddl);
+        table = pconn.getTable(new PTableKey(null, "TEST_TRANSACTIONAL_TABLE"));
+        htable = pconn.getQueryServices().getTable(Bytes.toBytes("TEST_TRANSACTIONAL_TABLE"));
+        assertTrue(table.isTransactional());
+        assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TransactionProcessor.class.getName()));
+    }
+
+    public void testCurrentDate() throws Exception {
+		String selectSql = "SELECT current_date() FROM "+FULL_TABLE_NAME;
+		try (Connection conn = DriverManager.getConnection(getUrl())) {
+			conn.setAutoCommit(false);
+			ResultSet rs = conn.createStatement().executeQuery(selectSql);
+	     	assertFalse(rs.next());
+	     	
+	        String upsert = "UPSERT INTO " + FULL_TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
+	        PreparedStatement stmt = conn.prepareStatement(upsert);
+			// upsert two rows
+			TestUtil.setRowKeyColumns(stmt, 1);
+			stmt.execute();
+			conn.commit();
+			
+			rs = conn.createStatement().executeQuery(selectSql);
+			assertTrue(rs.next());
+			Date date1 = rs.getDate(1);
+	     	assertFalse(rs.next());
+	     	
+	     	Thread.sleep(1000);
+	     	
+	     	rs = conn.createStatement().executeQuery(selectSql);
+			assertTrue(rs.next());
+			Date date2 = rs.getDate(1);
+	     	assertFalse(rs.next());
+	     	assertTrue("current_date() should change while executing multiple statements", date2.getTime() > date1.getTime());
+		}
+	}
+    
+    @Test
+    public void testReCreateTxnTableAfterDroppingExistingNonTxnTable() throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        Statement stmt = conn.createStatement();
+        stmt.execute("CREATE TABLE DEMO(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
+        stmt.execute("DROP TABLE DEMO");
+        stmt.execute("CREATE TABLE DEMO(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) TRANSACTIONAL=true");
+        stmt.execute("CREATE INDEX DEMO_IDX ON DEMO (v1) INCLUDE(v2)");
+        assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, "DEMO")).isTransactional());
+        assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, "DEMO_IDX")).isTransactional());
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
new file mode 100644
index 0000000..34869dd
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxCheckpointIT.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.tx;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.end2end.Shadower;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import co.cask.tephra.Transaction.VisibilityLevel;
+
+import com.google.common.collect.Maps;
+
+@RunWith(Parameterized.class)
+public class TxCheckpointIT extends BaseHBaseManagedTimeIT {
+	
+	private final boolean localIndex;
+	private final boolean mutable;
+	private String tableName;
+    private String indexName;
+    private String seqName;
+    private String fullTableName;
+
+	public TxCheckpointIT(boolean localIndex, boolean mutable) {
+		this.localIndex = localIndex;
+		this.mutable = mutable;
+		this.tableName = TestUtil.DEFAULT_DATA_TABLE_NAME;
+        this.indexName = "IDX_" + System.currentTimeMillis();
+        this.seqName = "SEQ_" + System.currentTimeMillis();
+        this.fullTableName = SchemaUtil.getTableName(tableName, tableName);
+	}
+	
+	@BeforeClass
+    @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, Boolean.toString(true));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+	
+	@Parameters(name="localIndex = {0} , mutable = {1}")
+    public static Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] {     
+                 { false, false }, { false, true }, { true, false }, { true, true }  
+           });
+    }
+    
+    @Test
+    public void testUpsertSelectDoesntSeeUpsertedData() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(3));
+        props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3));
+        props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        conn.createStatement().execute("CREATE SEQUENCE "+seqName);
+        conn.createStatement().execute("CREATE TABLE " + fullTableName + "(pk INTEGER PRIMARY KEY, val INTEGER)"+(!mutable? " IMMUTABLE_ROWS=true" : ""));
+        conn.createStatement().execute("CREATE "+(localIndex? "LOCAL " : "")+"INDEX " + indexName + " ON " + fullTableName + "(val)");
+
+        conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES (NEXT VALUE FOR " + seqName + ",1)");
+        for (int i=0; i<6; i++) {
+            Statement stmt = conn.createStatement();
+            int upsertCount = stmt.executeUpdate("UPSERT INTO " + fullTableName + " SELECT NEXT VALUE FOR " + seqName + ", val FROM " + fullTableName);
+            assertEquals((int)Math.pow(2, i), upsertCount);
+        }
+        conn.close();
+    }
+    
+    @Test
+    public void testRollbackOfUncommittedDelete() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        try {
+            Statement stmt = conn.createStatement();
+            stmt.execute("CREATE TABLE " + fullTableName + "(k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"+(!mutable? " IMMUTABLE_ROWS=true" : ""));
+            stmt.execute("CREATE "+(localIndex? "LOCAL " : "")+"INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE(v2)");
+            
+            stmt.executeUpdate("upsert into " + fullTableName + " values('x1', 'y1', 'a1')");
+            stmt.executeUpdate("upsert into " + fullTableName + " values('x2', 'y2', 'a2')");
+            
+            //assert values in data table
+            ResultSet rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName + " ORDER BY k");
+            assertTrue(rs.next());
+            assertEquals("x1", rs.getString(1));
+            assertEquals("y1", rs.getString(2));
+            assertEquals("a1", rs.getString(3));
+            assertTrue(rs.next());
+            assertEquals("x2", rs.getString(1));
+            assertEquals("y2", rs.getString(2));
+            assertEquals("a2", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert values in index table
+            rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName + " ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x1", rs.getString(1));
+            assertEquals("y1", rs.getString(2));
+            assertEquals("a1", rs.getString(3));
+            assertTrue(rs.next());
+            assertEquals("x2", rs.getString(1));
+            assertEquals("y2", rs.getString(2));
+            assertEquals("a2", rs.getString(3));
+            assertFalse(rs.next());
+            
+            conn.commit();
+            
+            stmt.executeUpdate("DELETE FROM " + fullTableName + " WHERE k='x1' AND v1='y1' AND v2='a1'");
+            //assert row is delete in data table
+            rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName + " ORDER BY k");
+            assertTrue(rs.next());
+            assertEquals("x2", rs.getString(1));
+            assertEquals("y2", rs.getString(2));
+            assertEquals("a2", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert row is delete in index table
+            rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName + " ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x2", rs.getString(1));
+            assertEquals("y2", rs.getString(2));
+            assertEquals("a2", rs.getString(3));
+            assertFalse(rs.next());
+            
+            conn.rollback();
+            
+            //assert two rows in data table
+            rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName + " ORDER BY k");
+            assertTrue(rs.next());
+            assertEquals("x1", rs.getString(1));
+            assertEquals("y1", rs.getString(2));
+            assertEquals("a1", rs.getString(3));
+            assertTrue(rs.next());
+            assertEquals("x2", rs.getString(1));
+            assertEquals("y2", rs.getString(2));
+            assertEquals("a2", rs.getString(3));
+            assertFalse(rs.next());
+            
+            //assert two rows in index table
+            rs = stmt.executeQuery("select k, v1, v2 from " + fullTableName + " ORDER BY v1");
+            assertTrue(rs.next());
+            assertEquals("x1", rs.getString(1));
+            assertEquals("y1", rs.getString(2));
+            assertEquals("a1", rs.getString(3));
+            assertTrue(rs.next());
+            assertEquals("x2", rs.getString(1));
+            assertEquals("y2", rs.getString(2));
+            assertEquals("a2", rs.getString(3));
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+    
+	@Test
+	public void testCheckpointForUpsertSelect() throws Exception {
+		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+		try (Connection conn = DriverManager.getConnection(getUrl(), props);) {
+			conn.setAutoCommit(false);
+			Statement stmt = conn.createStatement();
+
+			stmt.execute("CREATE TABLE " + fullTableName + "(ID BIGINT NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"
+					+ (!mutable ? " IMMUTABLE_ROWS=true" : ""));
+			stmt.execute("CREATE " + (localIndex ? "LOCAL " : "")
+					+ "INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE(v2)");
+
+            stmt.executeUpdate("upsert into " + fullTableName + " values(1, 'a2', 'b1')");
+            stmt.executeUpdate("upsert into " + fullTableName + " values(2, 'a2', 'b2')");
+            stmt.executeUpdate("upsert into " + fullTableName + " values(3, 'a3', 'b3')");
+			conn.commit();
+
+			upsertRows(conn);
+			conn.rollback();
+			verifyRows(conn, 3);
+
+			upsertRows(conn);
+			conn.commit();
+			verifyRows(conn, 6);
+		}
+	}
+
+	private void verifyRows(Connection conn, int expectedMaxId) throws SQLException {
+		ResultSet rs;
+		//query the data table
+		rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ max(id) from " + fullTableName + "");
+		assertTrue(rs.next());
+		assertEquals(expectedMaxId, rs.getLong(1));
+		assertFalse(rs.next());
+		
+		// query the index
+		rs = conn.createStatement().executeQuery("select /*+ INDEX(DEMO IDX) */ max(id) from " + fullTableName + "");
+		assertTrue(rs.next());
+		assertEquals(expectedMaxId, rs.getLong(1));
+		assertFalse(rs.next());
+	}
+
+	private void upsertRows(Connection conn) throws SQLException {
+		ResultSet rs;
+		MutationState state = conn.unwrap(PhoenixConnection.class)
+				.getMutationState();
+		state.startTransaction();
+		long wp = state.getWritePointer();
+		conn.createStatement().execute(
+				"upsert into " + fullTableName + " select max(id)+1, 'a4', 'b4' from " + fullTableName + "");
+		assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT,
+				state.getVisibilityLevel());
+		assertEquals(wp, state.getWritePointer()); // Make sure write ptr
+													// didn't move
+		rs = conn.createStatement().executeQuery("select max(id) from " + fullTableName + "");
+
+		assertTrue(rs.next());
+		assertEquals(4, rs.getLong(1));
+		assertFalse(rs.next());
+
+		conn.createStatement().execute(
+				"upsert into " + fullTableName + " select max(id)+1, 'a5', 'b5' from " + fullTableName + "");
+		assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT,
+				state.getVisibilityLevel());
+		assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr
+														// moves
+		wp = state.getWritePointer();
+		rs = conn.createStatement().executeQuery("select max(id) from " + fullTableName + "");
+
+		assertTrue(rs.next());
+		assertEquals(5, rs.getLong(1));
+		assertFalse(rs.next());
+		
+		conn.createStatement().execute(
+				"upsert into " + fullTableName + " select max(id)+1, 'a6', 'b6' from " + fullTableName + "");
+		assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT,
+				state.getVisibilityLevel());
+		assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr
+														// moves
+		wp = state.getWritePointer();
+		rs = conn.createStatement().executeQuery("select max(id) from " + fullTableName + "");
+
+		assertTrue(rs.next());
+		assertEquals(6, rs.getLong(1));
+		assertFalse(rs.next());
+	}
+	
+	@Test
+    public void testCheckpointForDeleteAndUpsert() throws Exception {
+		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+		ResultSet rs;
+		try (Connection conn = DriverManager.getConnection(getUrl(), props);) {
+			conn.setAutoCommit(false);
+			Statement stmt = conn.createStatement();
+			stmt.execute("CREATE TABLE " + fullTableName + "1(ID1 BIGINT NOT NULL PRIMARY KEY, FK1A INTEGER, FK1B INTEGER)"
+					+ (!mutable ? " IMMUTABLE_ROWS=true" : ""));
+			stmt.execute("CREATE TABLE " + fullTableName + "2(ID2 BIGINT NOT NULL PRIMARY KEY, FK2 INTEGER)"
+					+ (!mutable ? " IMMUTABLE_ROWS=true" : ""));
+			stmt.execute("CREATE " + (localIndex ? "LOCAL " : "")
+					+ "INDEX " + indexName + " ON " + fullTableName + "1 (FK1B)");
+			
+			stmt.executeUpdate("upsert into " + fullTableName + "1 values (1, 3, 3)");
+			stmt.executeUpdate("upsert into " + fullTableName + "1 values (2, 2, 2)");
+			stmt.executeUpdate("upsert into " + fullTableName + "1 values (3, 1, 1)");
+			stmt.executeUpdate("upsert into " + fullTableName + "2 values (1, 1)");
+			conn.commit();
+
+	        MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState();
+	        state.startTransaction();
+	        long wp = state.getWritePointer();
+	        conn.createStatement().execute("delete from " + fullTableName + "1 where id1=fk1b AND fk1b=id1");
+	        assertEquals(VisibilityLevel.SNAPSHOT, state.getVisibilityLevel());
+	        assertEquals(wp, state.getWritePointer()); // Make sure write ptr didn't move
+	
+	        rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ id1 from " + fullTableName + "1");
+	        assertTrue(rs.next());
+	        assertEquals(1,rs.getLong(1));
+	        assertTrue(rs.next());
+	        assertEquals(3,rs.getLong(1));
+	        assertFalse(rs.next());
+	        
+	        rs = conn.createStatement().executeQuery("select /*+ INDEX(DEMO IDX) */ id1 from " + fullTableName + "1");
+            assertTrue(rs.next());
+	        assertEquals(3,rs.getLong(1));
+	        assertTrue(rs.next());
+	        assertEquals(1,rs.getLong(1));
+	        assertFalse(rs.next());
+	
+	        conn.createStatement().execute("delete from " + fullTableName + "1 where id1 in (select fk1a from " + fullTableName + "1 join " + fullTableName + "2 on (fk2=id1))");
+	        assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel());
+	        assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr moved
+	
+	        rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ id1 from " + fullTableName + "1");
+	        assertTrue(rs.next());
+	        assertEquals(1,rs.getLong(1));
+	        assertFalse(rs.next());
+	
+            rs = conn.createStatement().executeQuery("select /*+ INDEX(DEMO IDX) */ id1 from " + fullTableName + "1");
+            assertTrue(rs.next());
+            assertEquals(1,rs.getLong(1));
+            assertFalse(rs.next());
+    
+            stmt.executeUpdate("upsert into " + fullTableName + "1 SELECT id1 + 3, id1, id1 FROM " + fullTableName + "1");
+            stmt.executeUpdate("upsert into " + fullTableName + "2 values (2, 4)");
+
+            conn.createStatement().execute("delete from " + fullTableName + "1 where id1 in (select fk1a from " + fullTableName + "1 join " + fullTableName + "2 on (fk2=id1))");
+            assertEquals(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT, state.getVisibilityLevel());
+            assertNotEquals(wp, state.getWritePointer()); // Make sure write ptr moved
+    
+            rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ id1 from " + fullTableName + "1");
+            assertTrue(rs.next());
+            assertEquals(4,rs.getLong(1));
+            assertFalse(rs.next());
+    
+            rs = conn.createStatement().executeQuery("select /*+ INDEX(DEMO IDX) */ id1 from " + fullTableName + "1");
+            assertTrue(rs.next());
+            assertEquals(4,rs.getLong(1));
+            assertFalse(rs.next());
+    
+	        conn.rollback();
+	        
+	        rs = conn.createStatement().executeQuery("select /*+ NO_INDEX */ id1 from " + fullTableName + "1");
+	        assertTrue(rs.next());
+	        assertEquals(1,rs.getLong(1));
+	        assertTrue(rs.next());
+	        assertEquals(2,rs.getLong(1));
+	        assertTrue(rs.next());
+	        assertEquals(3,rs.getLong(1));
+	        assertFalse(rs.next());
+
+	        rs = conn.createStatement().executeQuery("select /*+ INDEX(DEMO IDX) */ id1 from " + fullTableName + "1");
+            assertTrue(rs.next());
+            assertEquals(3,rs.getLong(1));
+            assertTrue(rs.next());
+            assertEquals(2,rs.getLong(1));
+            assertTrue(rs.next());
+            assertEquals(1,rs.getLong(1));
+            assertFalse(rs.next());
+		}
+    }  
+
+    
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/it/java/org/apache/phoenix/tx/TxPointInTimeQueryIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TxPointInTimeQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxPointInTimeQueryIT.java
new file mode 100644
index 0000000..a3b3aa2
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TxPointInTimeQueryIT.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.tx;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Properties;
+
+import org.apache.phoenix.end2end.BaseClientManagedTimeIT;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TxPointInTimeQueryIT extends BaseClientManagedTimeIT {
+
+	protected long ts;
+
+	@Before
+	public void initTable() throws Exception {
+		ts = nextTimestamp();
+	}
+
+	@Test
+	public void testQueryWithSCN() throws Exception {
+		Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+		props.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
+		try (Connection conn = DriverManager.getConnection(getUrl(), props);) {
+			try {
+				conn.createStatement().execute(
+								"CREATE TABLE t (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR) TRANSACTIONAL=true");
+				fail();
+			} catch (SQLException e) {
+				assertEquals("Unexpected Exception",
+						SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET
+								.getErrorCode(), e.getErrorCode());
+			}
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
index a1563db..359e08f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/IndexMetaDataCache.java
@@ -23,6 +23,8 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
+import co.cask.tephra.Transaction;
+
 import org.apache.phoenix.index.IndexMaintainer;
 
 public interface IndexMetaDataCache extends Closeable {
@@ -36,6 +38,13 @@ public interface IndexMetaDataCache extends Closeable {
         public List<IndexMaintainer> getIndexMaintainers() {
             return Collections.emptyList();
         }
+
+        @Override
+        public Transaction getTransaction() {
+            return null;
+        }
+        
     };
     public List<IndexMaintainer> getIndexMaintainers();
+    public Transaction getTransaction();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index 3cfdd71..f188ab2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -71,6 +71,7 @@ import org.apache.phoenix.util.SQLCloseables;
 import org.apache.phoenix.util.ScanUtil;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.protobuf.HBaseZeroCopyByteString;
 
 /**
  * 
@@ -144,7 +145,7 @@ public class ServerCacheClient {
 
     }
     
-    public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final ServerCacheFactory cacheFactory, final TableRef cacheUsingTableRef) throws SQLException {
+    public ServerCache addServerCache(ScanRanges keyRanges, final ImmutableBytesWritable cachePtr, final byte[] txState, final ServerCacheFactory cacheFactory, final TableRef cacheUsingTableRef) throws SQLException {
         ConnectionQueryServices services = connection.getQueryServices();
         MemoryChunk chunk = services.getMemoryManager().allocate(cachePtr.getLength());
         List<Closeable> closeables = new ArrayList<Closeable>();
@@ -213,6 +214,7 @@ public class ServerCacheClient {
                                                     ServerCacheFactoryProtos.ServerCacheFactory.Builder svrCacheFactoryBuider = ServerCacheFactoryProtos.ServerCacheFactory.newBuilder();
                                                     svrCacheFactoryBuider.setClassName(cacheFactory.getClass().getName());
                                                     builder.setCacheFactory(svrCacheFactoryBuider.build());
+                                                    builder.setTxState(HBaseZeroCopyByteString.wrap(txState));
                                                     instance.addServerCache(controller, builder.build(), rpcCallback);
                                                     if(controller.getFailedOn() != null) {
                                                         throw controller.getFailedOn();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
index b968a9b..c7cd58f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
@@ -37,6 +37,6 @@ import org.apache.phoenix.memory.MemoryManager;
 public interface TenantCache {
     MemoryManager getMemoryManager();
     Closeable getServerCache(ImmutableBytesPtr cacheId);
-    Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException;
+    Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory) throws SQLException;
     void removeServerCache(ImmutableBytesPtr cacheId) throws SQLException;
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
index 9005fa8..6ef7a6f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
@@ -23,14 +23,17 @@ import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
-import com.google.common.cache.*;
 import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.memory.MemoryManager;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.util.Closeables;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+
 /**
  * 
  * Cache per tenant on server side.  Tracks memory usage for each
@@ -80,11 +83,11 @@ public class TenantCacheImpl implements TenantCache {
     }
     
     @Override
-    public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException {
-        MemoryChunk chunk = this.getMemoryManager().allocate(cachePtr.getLength());
+    public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory) throws SQLException {
+        MemoryChunk chunk = this.getMemoryManager().allocate(cachePtr.getLength() + txState.length);
         boolean success = false;
         try {
-            Closeable element = cacheFactory.newCache(cachePtr, chunk);
+            Closeable element = cacheFactory.newCache(cachePtr, txState, chunk);
             getServerCaches().put(cacheId, element);
             success = true;
             return element;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
index f09b508..d5b74fb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/CreateTableCompiler.java
@@ -132,8 +132,8 @@ public class CreateTableCompiler {
                             // on our connection.
                             new DelegateConnectionQueryServices(connection.getQueryServices()) {
                                 @Override
-                                public PMetaData addTable(PTable table) throws SQLException {
-                                    return connection.addTable(table);
+                                public PMetaData addTable(PTable table, long resolvedTime) throws SQLException {
+                                    return connection.addTable(table, resolvedTime);
                                 }
                             },
                             connection, tableRef.getTimeStamp());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index f0f693e..12aa7c5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -78,6 +78,7 @@ import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.ScanUtil;
 
@@ -323,16 +324,20 @@ public class DeleteCompiler {
                 // - read-only VIEW 
                 // - transactional table with a connection having an SCN
                 // TODO: SchemaUtil.isReadOnly(PTable, connection)?
-                if ( table.getType() == PTableType.VIEW && table.getViewType().isReadOnly() ) {
+                if (table.getType() == PTableType.VIEW && table.getViewType().isReadOnly()) {
                     throw new ReadOnlyTableException(schemaName,tableName);
                 }
+                else if (table.isTransactional() && connection.getSCN() != null) {
+                   throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SPECIFY_SCN_FOR_TXN_TABLE).setSchemaName(schemaName)
+                   .setTableName(tableName).build().buildException();
+                }
                 
                 immutableIndex = getNonDisabledImmutableIndexes(tableRefToBe);
                 boolean mayHaveImmutableIndexes = !immutableIndex.isEmpty();
                 noQueryReqd = !hasLimit;
                 // Can't run on same server for transactional data, as we need the row keys for the data
                 // that is being upserted for conflict detection purposes.
-                runOnServer = isAutoCommit && noQueryReqd;
+                runOnServer = isAutoCommit && noQueryReqd && !table.isTransactional();
                 HintNode hint = delete.getHint();
                 if (runOnServer && !delete.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) {
                     hint = HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE);
@@ -363,8 +368,8 @@ public class DeleteCompiler {
                 QueryCompiler compiler = new QueryCompiler(statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactory, new SequenceManager(statement));
                 dataPlanToBe = compiler.compile();
                 queryPlans = Lists.newArrayList(mayHaveImmutableIndexes
-                        ? optimizer.getApplicablePlans(statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactory)
-                        : optimizer.getBestPlan(statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactory));
+                        ? optimizer.getApplicablePlans(dataPlanToBe, statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactory)
+                        : optimizer.getBestPlan(dataPlanToBe, statement, select, resolverToBe, Collections.<PColumn>emptyList(), parallelIteratorFactory));
                 if (mayHaveImmutableIndexes) { // FIXME: this is ugly
                     // Lookup the table being deleted from in the cache, as it's possible that the
                     // optimizer updated the cache if it found indexes that were out of date.
@@ -538,11 +543,12 @@ public class DeleteCompiler {
                         ImmutableBytesWritable ptr = context.getTempPtr();
                         PTable table = tableRef.getTable();
                         table.getIndexMaintainers(ptr, context.getConnection());
+                        byte[] txState = table.isTransactional() ? connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY;
                         ServerCache cache = null;
                         try {
                             if (ptr.getLength() > 0) {
                                 IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
-                                cache = client.addIndexMetadataCache(context.getScanRanges(), ptr);
+                                cache = client.addIndexMetadataCache(context.getScanRanges(), ptr, txState);
                                 byte[] uuidValue = cache.getId();
                                 context.getScan().setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index 426556f..0828b94 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -80,6 +80,7 @@ import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TransactionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -226,7 +227,7 @@ public class FromCompiler {
         PTable t = PTableImpl.makePTable(table, projectedColumns);
         return new SingleTableColumnResolver(connection, new TableRef(tableRef.getTableAlias(), t, tableRef.getLowerBoundTimeStamp(), tableRef.hasDynamicCols()));
     }
-
+    
     public static ColumnResolver getResolver(TableRef tableRef)
             throws SQLException {
         SingleTableColumnResolver visitor = new SingleTableColumnResolver(tableRef);
@@ -407,7 +408,7 @@ public class FromCompiler {
             PTable theTable = null;
             if (updateCacheImmediately || connection.getAutoCommit()) {
                 MetaDataMutationResult result = client.updateCache(schemaName, tableName);
-                timeStamp = result.getMutationTime();
+                timeStamp = TransactionUtil.getResolvedTimestamp(connection, result);
                 theTable = result.getTable();
                 if (theTable == null) {
                     throw new TableNotFoundException(schemaName, tableName, timeStamp);
@@ -427,7 +428,7 @@ public class FromCompiler {
                 if (theTable == null) {
                     MetaDataMutationResult result = client.updateCache(schemaName, tableName);
                     if (result.wasUpdated()) {
-                        timeStamp = result.getMutationTime();
+                    	timeStamp = TransactionUtil.getResolvedTimestamp(connection, result);
                         theTable = result.getTable();
                     }
                 }
@@ -651,7 +652,7 @@ public class FromCompiler {
                     PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM,
                     null, null, columns, null, null, Collections.<PTable>emptyList(),
                     false, Collections.<PName>emptyList(), null, null, false, false, false, null,
-                    null, null, false);
+                    null, null, false, false);
 
             String alias = subselectNode.getAlias();
             TableRef tableRef = new TableRef(alias, t, MetaDataProtocol.MIN_TABLE_TIMESTAMP, false);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index e4ca5d9..b55e4aa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -1302,7 +1302,7 @@ public class JoinCompiler {
                 left.getBucketNum(), merged,left.getParentSchemaName(), left.getParentTableName(), left.getIndexes(),
                 left.isImmutableRows(), Collections.<PName>emptyList(), null, null, PTable.DEFAULT_DISABLE_WAL,
                 left.isMultiTenant(), left.getStoreNulls(), left.getViewType(), left.getViewIndexId(), left.getIndexType(),
-                left.rowKeyOrderOptimizable());
+                left.rowKeyOrderOptimizable(), left.isTransactional());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
index c6f6bf2..506623b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
@@ -48,6 +48,7 @@ import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.TransactionUtil;
 
 import com.google.common.collect.Lists;
 
@@ -179,7 +180,14 @@ public class PostDDLCompiler {
                         };
                         PhoenixStatement statement = new PhoenixStatement(connection);
                         StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement));
-                        ScanUtil.setTimeRange(scan, timestamp);
+                        long ts = timestamp;
+                        // FIXME: DDL operations aren't transactional, so we're basing the timestamp on a server timestamp.
+                        // Not sure what the fix should be. We don't need conflict detection nor filtering of invalid transactions
+                        // in this case, so maybe this is ok.
+                        if (tableRef.getTable().isTransactional()) {
+                            ts = TransactionUtil.convertToNanoseconds(ts);
+                        }
+                        ScanUtil.setTimeRange(scan, ts);
                         if (emptyCF != null) {
                             scan.setAttribute(BaseScannerRegionObserver.EMPTY_CF, emptyCF);
                         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index 312de45..80c4b89 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -252,7 +252,8 @@ public class StatementContext {
 
     public long getCurrentTime() throws SQLException {
         long ts = this.getCurrentTable().getTimeStamp();
-        if (ts != QueryConstants.UNSET_TIMESTAMP) {
+        // if the table is transactional then it is only resolved once per query, so we can't use the table timestamp
+        if (!this.getCurrentTable().getTable().isTransactional() && ts != QueryConstants.UNSET_TIMESTAMP) {
             return ts;
         }
         if (currentTime != QueryConstants.UNSET_TIMESTAMP) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
index c6aa546..551b05c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
@@ -152,7 +152,7 @@ public class TupleProjectionCompiler {
                 table.getBucketNum(), projectedColumns, table.getParentSchemaName(),
                 table.getParentName(), table.getIndexes(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null,
                 table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(),
-                table.getIndexType(), table.rowKeyOrderOptimizable());
+                table.getIndexType(), table.rowKeyOrderOptimizable(), table.isTransactional());
     }
 
     public static PTable createProjectedTable(TableRef tableRef, List<ColumnRef> sourceColumnRefs, boolean retainPKColumns) throws SQLException {
@@ -179,7 +179,7 @@ public class TupleProjectionCompiler {
                     retainPKColumns ? table.getBucketNum() : null, projectedColumns, null,
                     null, Collections.<PTable>emptyList(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null,
                     table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(),
-                    null, table.rowKeyOrderOptimizable());
+                    null, table.rowKeyOrderOptimizable(), table.isTransactional());
     }
 
     // For extracting column references from single select statement

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
index afca97a..298303d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
@@ -82,7 +82,7 @@ public class UnionCompiler {
         PTable tempTable = PTableImpl.makePTable(statement.getConnection().getTenantId(), UNION_SCHEMA_NAME, UNION_TABLE_NAME, 
                 PTableType.SUBQUERY, null, HConstants.LATEST_TIMESTAMP, scn == null ? HConstants.LATEST_TIMESTAMP : scn, null, null,
                         projectedColumns, null, null, null,
-                        true, null, null, null, true, true, true, null, null, null, false);
+                        true, null, null, null, true, true, true, null, null, null, false, false);
         TableRef tableRef = new TableRef(null, tempTable, 0, false);
         return tableRef;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 953eb2f..0f7f6f9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -182,7 +182,7 @@ public class UpsertCompiler {
                 if (isAutoCommit && rowCount % batchSize == 0) {
                     MutationState state = new MutationState(tableRef, mutation, 0, maxSize, connection);
                     connection.getMutationState().join(state);
-                    connection.commit();
+                    connection.getMutationState().send();
                     mutation.clear();
                 }
             }
@@ -292,9 +292,13 @@ public class UpsertCompiler {
                 // Cannot update:
                 // - read-only VIEW 
                 // - transactional table with a connection having an SCN 
-                if ( table.getType() == PTableType.VIEW && table.getViewType().isReadOnly() ) {
+                if (table.getType() == PTableType.VIEW && table.getViewType().isReadOnly()) {
                     throw new ReadOnlyTableException(schemaName,tableName);
                 }
+                else if (table.isTransactional() && connection.getSCN() != null) {
+                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SPECIFY_SCN_FOR_TXN_TABLE).setSchemaName(schemaName)
+                    .setTableName(tableName).build().buildException();
+                }
                 boolean isSalted = table.getBucketNum() != null;
                 isTenantSpecific = table.isMultiTenant() && connection.getTenantId() != null;
                 isSharedViewIndex = table.getViewIndexId() != null;
@@ -463,7 +467,7 @@ public class UpsertCompiler {
                         // so we might be able to run it entirely on the server side.
                         // For a table with row timestamp column, we can't guarantee that the row key will reside in the
                         // region space managed by region servers. So we bail out on executing on server side.
-                        runOnServer = sameTable && isAutoCommit && !(table.isImmutableRows() && !table.getIndexes().isEmpty()) && table.getRowTimestampColPos() == -1;
+                        runOnServer = sameTable && isAutoCommit && !table.isTransactional() && !(table.isImmutableRows() && !table.getIndexes().isEmpty()) && table.getRowTimestampColPos() == -1;
                     }
                     // If we may be able to run on the server, add a hint that favors using the data table
                     // if all else is equal.
@@ -678,12 +682,13 @@ public class UpsertCompiler {
                             ImmutableBytesWritable ptr = context.getTempPtr();
                             PTable table = tableRef.getTable();
                             table.getIndexMaintainers(ptr, context.getConnection());
+                            byte[] txState = table.isTransactional() ? connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY;
 
                             ServerCache cache = null;
                             try {
                                 if (ptr.getLength() > 0) {
                                     IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef);
-                                    cache = client.addIndexMetadataCache(context.getScanRanges(), ptr);
+                                    cache = client.addIndexMetadataCache(context.getScanRanges(), ptr, txState);
                                     byte[] uuidValue = cache.getId();
                                     scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 33218ee..d720806 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -57,6 +57,8 @@ import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 
+import co.cask.tephra.Transaction;
+
 import com.google.common.collect.ImmutableList;
 
 
@@ -86,6 +88,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     public static final String EXPECTED_UPPER_REGION_KEY = "_ExpectedUpperRegionKey";
     public static final String REVERSE_SCAN = "_ReverseScan";
     public static final String ANALYZE_TABLE = "_ANALYZETABLE";
+    public static final String TX_STATE = "_TxState";
     public static final String GUIDEPOST_WIDTH_BYTES = "_GUIDEPOST_WIDTH_BYTES";
     public static final String GUIDEPOST_PER_REGION = "_GUIDEPOST_PER_REGION";
     public static final String UPGRADE_DESC_ROW_KEY = "_UPGRADE_DESC_ROW_KEY";
@@ -228,7 +231,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
             final byte[][] viewConstants, final TupleProjector projector,
             final ImmutableBytesWritable ptr) {
         return getWrappedScanner(c, s, null, null, offset, scan, dataColumns, tupleProjector,
-                dataRegion, indexMaintainer, viewConstants, null, null, projector, ptr);
+                dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr);
     }
 
     /**
@@ -236,13 +239,14 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
      * re-throws as DoNotRetryIOException to prevent needless retrying hanging the query
      * for 30 seconds. Unfortunately, until HBASE-7481 gets fixed, there's no way to do
      * the same from a custom filter.
-     * @param arrayFuncRefs
      * @param arrayKVRefs
+     * @param arrayFuncRefs
      * @param offset starting position in the rowkey.
      * @param scan
      * @param tupleProjector
      * @param dataRegion
      * @param indexMaintainer
+     * @param tx current transaction
      * @param viewConstants
      */
     protected RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c,
@@ -250,6 +254,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
             final Expression[] arrayFuncRefs, final int offset, final Scan scan,
             final ColumnReference[] dataColumns, final TupleProjector tupleProjector,
             final Region dataRegion, final IndexMaintainer indexMaintainer,
+            Transaction tx, 
             final byte[][] viewConstants, final KeyValueSchema kvSchema,
             final ValueBitSet kvSchemaBitSet, final TupleProjector projector,
             final ImmutableBytesWritable ptr) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 7eb1dc6..0a313be 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -60,6 +60,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME_INDEX;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID_INDEX;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_BYTES;
@@ -240,6 +241,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final KeyValue EMPTY_KEYVALUE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES);
     private static final KeyValue BASE_COLUMN_COUNT_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.BASE_COLUMN_COUNT_BYTES);
     private static final KeyValue ROW_KEY_ORDER_OPTIMIZABLE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, ROW_KEY_ORDER_OPTIMIZABLE_BYTES);
+    private static final KeyValue TRANSACTIONAL_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TRANSACTIONAL_BYTES);
     
     private static final List<KeyValue> TABLE_KV_COLUMNS = Arrays.<KeyValue>asList(
             EMPTY_KEYVALUE_KV,
@@ -261,7 +263,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
             INDEX_DISABLE_TIMESTAMP_KV,
             STORE_NULLS_KV,
             BASE_COLUMN_COUNT_KV,
-            ROW_KEY_ORDER_OPTIMIZABLE_KV
+            ROW_KEY_ORDER_OPTIMIZABLE_KV,
+            TRANSACTIONAL_KV
             );
     static {
         Collections.sort(TABLE_KV_COLUMNS, KeyValue.COMPARATOR);
@@ -285,6 +288,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
     private static final int STORE_NULLS_INDEX = TABLE_KV_COLUMNS.indexOf(STORE_NULLS_KV);
     private static final int BASE_COLUMN_COUNT_INDEX = TABLE_KV_COLUMNS.indexOf(BASE_COLUMN_COUNT_KV);
     private static final int ROW_KEY_ORDER_OPTIMIZABLE_INDEX = TABLE_KV_COLUMNS.indexOf(ROW_KEY_ORDER_OPTIMIZABLE_KV);
+    private static final int TRANSACTIONAL_INDEX = TABLE_KV_COLUMNS.indexOf(TRANSACTIONAL_KV);
 
     // KeyValues for Column
     private static final KeyValue DECIMAL_DIGITS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES);
@@ -789,6 +793,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         boolean multiTenant = multiTenantKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(multiTenantKv.getValueArray(), multiTenantKv.getValueOffset(), multiTenantKv.getValueLength()));
         Cell storeNullsKv = tableKeyValues[STORE_NULLS_INDEX];
         boolean storeNulls = storeNullsKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(storeNullsKv.getValueArray(), storeNullsKv.getValueOffset(), storeNullsKv.getValueLength()));
+        Cell transactionalKv = tableKeyValues[TRANSACTIONAL_INDEX];
+        boolean transactional = transactionalKv == null ? false : Boolean.TRUE.equals(PBoolean.INSTANCE.toObject(transactionalKv.getValueArray(), transactionalKv.getValueOffset(), transactionalKv.getValueLength()));
         Cell viewTypeKv = tableKeyValues[VIEW_TYPE_INDEX];
         ViewType viewType = viewTypeKv == null ? null : ViewType.fromSerializedValue(viewTypeKv.getValueArray()[viewTypeKv.getValueOffset()]);
         Cell viewIndexIdKv = tableKeyValues[VIEW_INDEX_ID_INDEX];
@@ -844,7 +850,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
         return PTableImpl.makePTable(tenantId, schemaName, tableName, tableType, indexState, timeStamp,
             tableSeqNum, pkName, saltBucketNum, columns, tableType == INDEX ? schemaName : null,
             tableType == INDEX ? dataTableName : null, indexes, isImmutableRows, physicalTables, defaultFamilyName, viewStatement,
-            disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, stats, baseColumnCount, rowKeyOrderOptimizable);
+            disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, stats, baseColumnCount, rowKeyOrderOptimizable, transactional);
     }
 
     private PFunction getFunction(RegionScanner scanner, final boolean isReplace, long clientTimeStamp, List<Mutation> deleteMutationsForReplace)
@@ -2332,6 +2338,17 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso
                                 }
                                 continue;
                             }
+                        } else if (pkCount == COLUMN_NAME_INDEX &&
+                                   ! (Bytes.compareTo(schemaName, rowKeyMetaData[SCHEMA_NAME_INDEX]) == 0 &&
+                                      Bytes.compareTo(tableName, rowKeyMetaData[TABLE_NAME_INDEX]) == 0 ) ) {
+                            // Invalidate any table with mutations
+                            // TODO: this likely means we don't need the above logic that
+                            // loops through the indexes if adding a PK column, since we'd
+                            // always have header rows for those.
+                            invalidateList.add(new ImmutableBytesPtr(SchemaUtil
+                                    .getTableKey(tenantId, 
+                                            rowKeyMetaData[SCHEMA_NAME_INDEX],
+                                            rowKeyMetaData[TABLE_NAME_INDEX])));
                         }
                     }
                     tableMetaData.addAll(mutationsForAddingColumnsToViews);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 7cc4123..ba06828 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -23,9 +23,9 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos;
-import org.apache.phoenix.coprocessor.generated.PFunctionProtos;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
+import org.apache.phoenix.coprocessor.generated.PFunctionProtos;
 import org.apache.phoenix.hbase.index.util.VersionUtil;
 import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.schema.PColumn;
@@ -73,7 +73,9 @@ public abstract class MetaDataProtocol extends MetaDataService {
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_3_0 = MIN_TABLE_TIMESTAMP + 7;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_5_0 = MIN_TABLE_TIMESTAMP + 8;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0 = MIN_TABLE_TIMESTAMP + 9;
-    public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_4_6_0;
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0 = MIN_TABLE_TIMESTAMP + 10;
+    // MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the MIN_SYSTEM_TABLE_TIMESTAMP_* constants
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_4_7_0;
     // TODO: pare this down to minimum, as we don't need duplicates for both table and column errors, nor should we need
     // a different code for every type of error.
     // ENTITY_ALREADY_EXISTS, ENTITY_NOT_FOUND, NEWER_ENTITY_FOUND, ENTITY_NOT_IN_REGION, CONCURRENT_MODIFICATION