You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2015/10/03 23:38:23 UTC

[3/3] phoenix git commit: PHOENIX-914 Native HBase timestamp support to optimize date range queries in Phoenix

PHOENIX-914 Native HBase timestamp support to optimize date range queries in Phoenix


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/72e7ccd1
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/72e7ccd1
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/72e7ccd1

Branch: refs/heads/4.x-HBase-0.98
Commit: 72e7ccd17f7cbbaae4adaa85a23fd54a46315770
Parents: 911a968
Author: Samarth <sa...@salesforce.com>
Authored: Sat Oct 3 14:37:54 2015 -0700
Committer: Samarth <sa...@salesforce.com>
Committed: Sat Oct 3 14:37:54 2015 -0700

----------------------------------------------------------------------
 .../apache/phoenix/end2end/AlterTableIT.java    |  70 +++
 .../org/apache/phoenix/end2end/DeleteIT.java    |  63 +++
 .../apache/phoenix/end2end/UpsertSelectIT.java  | 531 +++++++++++++++++++
 .../apache/phoenix/end2end/UpsertValuesIT.java  | 284 ++++++++++
 phoenix-core/src/main/antlr3/PhoenixSQL.g       |  27 +-
 .../apache/phoenix/compile/DeleteCompiler.java  |  10 +-
 .../apache/phoenix/compile/FromCompiler.java    |   6 +-
 .../phoenix/compile/ListJarsQueryPlan.java      |   2 +-
 .../apache/phoenix/compile/PostDDLCompiler.java |   3 +
 .../phoenix/compile/PostIndexDDLCompiler.java   |   5 +-
 .../org/apache/phoenix/compile/ScanRanges.java  | 110 +++-
 .../phoenix/compile/StatementContext.java       |  11 -
 .../apache/phoenix/compile/TraceQueryPlan.java  |   2 +-
 .../apache/phoenix/compile/UnionCompiler.java   |   2 +-
 .../apache/phoenix/compile/UpsertCompiler.java  | 120 ++++-
 .../apache/phoenix/compile/WhereOptimizer.java  |   2 +-
 .../coprocessor/MetaDataEndpointImpl.java       |  14 +-
 .../phoenix/coprocessor/MetaDataProtocol.java   |   1 +
 .../coprocessor/generated/PTableProtos.java     | 142 ++++-
 .../phoenix/exception/SQLExceptionCode.java     |  11 +-
 .../apache/phoenix/execute/BaseQueryPlan.java   |  29 +-
 .../apache/phoenix/execute/MutationState.java   | 110 +++-
 .../apache/phoenix/iterate/ExplainTable.java    |   7 +
 .../apache/phoenix/jdbc/PhoenixConnection.java  |  11 +-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |   2 +
 .../phoenix/jdbc/PhoenixPreparedStatement.java  |   1 -
 .../org/apache/phoenix/parse/ColumnDef.java     |  11 +-
 .../phoenix/parse/ColumnDefInPkConstraint.java  |  44 ++
 .../apache/phoenix/parse/ParseNodeFactory.java  |  21 +-
 .../phoenix/parse/PrimaryKeyConstraint.java     |  48 +-
 .../query/ConnectionQueryServicesImpl.java      |   7 +-
 .../apache/phoenix/query/QueryConstants.java    |   3 +
 .../apache/phoenix/schema/DelegateColumn.java   |   5 +
 .../apache/phoenix/schema/DelegateTable.java    |   5 +
 .../apache/phoenix/schema/MetaDataClient.java   | 245 +++++----
 .../java/org/apache/phoenix/schema/PColumn.java |   5 +
 .../org/apache/phoenix/schema/PColumnImpl.java  |  19 +-
 .../java/org/apache/phoenix/schema/PDatum.java  |   1 +
 .../apache/phoenix/schema/PMetaDataImpl.java    |   2 +-
 .../java/org/apache/phoenix/schema/PTable.java  |   6 +
 .../org/apache/phoenix/schema/PTableImpl.java   |  16 +
 .../org/apache/phoenix/schema/SaltingUtil.java  |   2 +-
 .../java/org/apache/phoenix/util/ScanUtil.java  |  34 ++
 .../phoenix/compile/QueryCompilerTest.java      |  43 ++
 .../phoenix/compile/WhereOptimizerTest.java     |   2 +
 .../phoenix/execute/CorrelatePlanTest.java      |   2 +-
 .../phoenix/execute/UnnestArrayPlanTest.java    |   4 +-
 .../expression/ColumnExpressionTest.java        |   8 +-
 .../iterate/AggregateResultScannerTest.java     |   4 +
 .../apache/phoenix/jdbc/PhoenixDriverTest.java  |  16 +
 .../apache/phoenix/parse/QueryParserTest.java   |   2 +-
 phoenix-protocol/src/main/PTable.proto          |   1 +
 52 files changed, 1877 insertions(+), 255 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
index 4053301..dc47d99 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
@@ -23,6 +23,7 @@ import static org.apache.phoenix.util.TestUtil.closeConnection;
 import static org.apache.phoenix.util.TestUtil.closeStatement;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -1998,5 +1999,74 @@ public class AlterTableIT extends BaseOwnClusterHBaseManagedTimeIT {
             conn.close();
         }
     }
+    
+    @Test
+    public void testDeclaringColumnAsRowTimestamp() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("CREATE TABLE T1 (PK1 DATE NOT NULL, PK2 VARCHAR NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1 ROW_TIMESTAMP, PK2)) ");
+            PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class); 
+            PTable table = phxConn.getMetaDataCache().getTable(new PTableKey(phxConn.getTenantId(), "T1"));
+            // Assert that the column shows up as row time stamp in the cache.
+            assertTrue(table.getColumn("PK1").isRowTimestamp());
+            assertFalse(table.getColumn("PK2").isRowTimestamp());
+            assertIsRowTimestampSet("T1", "PK1");
+            
+            conn.createStatement().execute("CREATE TABLE T6 (PK1 VARCHAR, PK2 DATE PRIMARY KEY ROW_TIMESTAMP, KV1 VARCHAR, KV2 INTEGER)");
+            table = phxConn.getMetaDataCache().getTable(new PTableKey(phxConn.getTenantId(), "T6"));
+            // Assert that the column shows up as row time stamp in the cache.
+            assertFalse(table.getColumn("PK1").isRowTimestamp());
+            assertTrue(table.getColumn("PK2").isRowTimestamp());
+            assertIsRowTimestampSet("T6", "PK2");
+            
+            // Create an index on a table has a row time stamp pk column. The column should show up as a row time stamp column for the index too. 
+            conn.createStatement().execute("CREATE INDEX T6_IDX ON T6 (KV1) include (KV2)");
+            PTable indexTable = phxConn.getMetaDataCache().getTable(new PTableKey(phxConn.getTenantId(), "T6_IDX"));
+            String indexColName = IndexUtil.getIndexColumnName(table.getColumn("PK2"));
+            // Assert that the column shows up as row time stamp in the cache.
+            assertTrue(indexTable.getColumn(indexColName).isRowTimestamp());
+            assertIsRowTimestampSet("T6_IDX", indexColName);
+            
+            // Creating a view with a row_timestamp column in its pk constraint is not allowed
+            try {
+                conn.createStatement().execute("CREATE VIEW T6_VIEW (KV3 VARCHAR, KV4 DATE, KV5 INTEGER, CONSTRAINT PK PRIMARY KEY (KV3, KV4 ROW_TIMESTAMP) ) AS SELECT * FROM T6");
+                fail("Creating a view with a row_timestamp column in its pk constraint is not allowed");
+            } catch (SQLException e) {
+                assertEquals(SQLExceptionCode.ROWTIMESTAMP_NOT_ALLOWED_ON_VIEW.getErrorCode(), e.getErrorCode());
+            }
+            
+            // Make sure that the base table column declared as row_timestamp is also row_timestamp for view
+            conn.createStatement().execute("CREATE VIEW T6_VIEW (KV3 VARCHAR, KV4 VARCHAR, KV5 INTEGER, CONSTRAINT PK PRIMARY KEY (KV3, KV4) ) AS SELECT * FROM T6");
+            PTable view = phxConn.getMetaDataCache().getTable(new PTableKey(phxConn.getTenantId(), "T6_VIEW"));
+            assertNotNull(view.getPKColumn("PK2"));
+            assertTrue(view.getPKColumn("PK2").isRowTimestamp());
+        }
+    }
+    
+    private void assertIsRowTimestampSet(String tableName, String columnName) throws SQLException {
+        String sql = "SELECT IS_ROW_TIMESTAMP FROM SYSTEM.CATALOG WHERE TABLE_SCHEM IS NULL AND TABLE_NAME = ? AND COLUMN_FAMILY IS NULL AND COLUMN_NAME = ?";
+        try(Connection conn = DriverManager.getConnection(getUrl())) {
+            PreparedStatement stmt = conn.prepareStatement(sql);
+            stmt.setString(1, tableName);
+            stmt.setString(2, columnName);
+            ResultSet rs = stmt.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(true, rs.getBoolean(1));
+        }
+    }
+    
+    @Test
+    public void testAddingRowTimestampColumnNotAllowedViaAlterTable() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("CREATE TABLE T1 (PK1 VARCHAR NOT NULL, PK2 VARCHAR NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2)) ");
+            // adding a new pk column that is also row_timestamp is not allowed
+            try {
+                conn.createStatement().execute("ALTER TABLE T1 ADD PK3 DATE PRIMARY KEY ROW_TIMESTAMP");
+                fail("Altering table to add a PK column as row_timestamp column should fail");
+            } catch (SQLException e) {
+                assertEquals(SQLExceptionCode.ROWTIMESTAMP_CREATE_ONLY.getErrorCode(), e.getErrorCode());
+            }
+        }
+    }
+    
 }
  
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
index 36dff6f..745c730 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
@@ -472,6 +472,69 @@ public class DeleteIT extends BaseHBaseManagedTimeIT {
             }
         }
     }
+    
+    @Test
+    public void testDeleteForTableWithRowTimestampColServer() throws Exception {
+        testDeleteForTableWithRowTimestampCol(true);
+    }
+    
+    @Test
+    public void testDeleteForTableWithRowTimestampColClient() throws Exception {
+        testDeleteForTableWithRowTimestampCol(false);
+    }
+    
+    private void testDeleteForTableWithRowTimestampCol(boolean autoCommit) throws Exception {
+        String tableName = "testDeleteForTableWithRowTimestampCol".toUpperCase();
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.setAutoCommit(autoCommit);
+            Statement stm = conn.createStatement();
+            stm.execute("CREATE TABLE IF NOT EXISTS " + tableName +
+                    " (HOST CHAR(2) NOT NULL," +
+                    "STAT_DATE DATE NOT NULL, \n" + 
+                    "USAGE.CORE BIGINT," +
+                    "USAGE.DB BIGINT," +
+                    "CONSTRAINT PK PRIMARY KEY (HOST, STAT_DATE ROW_TIMESTAMP))");
+            stm.close();
+
+            PreparedStatement psInsert = conn
+                    .prepareStatement("UPSERT INTO " + tableName + " (HOST, STAT_DATE, CORE, DB) VALUES(?,?,?,?)");
+            psInsert.setString(1, "AA");
+            psInsert.setDate(2, new Date(100));
+            psInsert.setLong(3, 1L);
+            psInsert.setLong(4, 2L);
+            psInsert.execute();
+            psInsert.close();
+            if (!autoCommit) {
+                conn.commit();
+            }
+            conn.createStatement().execute("DELETE FROM " + tableName);
+            if (!autoCommit) {
+                conn.commit();
+            }
+            ResultSet rs = conn.createStatement().executeQuery("SELECT count(*) FROM " + tableName);
+            assertTrue(rs.next());
+            assertEquals(0, rs.getLong(1));
+            
+            // try with value for row_timestamp column generated by phoenix 
+            psInsert = conn
+                    .prepareStatement("UPSERT INTO " + tableName + " (HOST, CORE, DB) VALUES(?,?,?)");
+            psInsert.setString(1, "BB");
+            psInsert.setLong(2, 1L);
+            psInsert.setLong(3, 2L);
+            psInsert.execute();
+            psInsert.close();
+            if (!autoCommit) {
+                conn.commit();
+            }
+            conn.createStatement().execute("DELETE FROM " + tableName);
+            if (!autoCommit) {
+                conn.commit();
+            }
+            rs = conn.createStatement().executeQuery("SELECT count(*) FROM " + tableName);
+            assertTrue(rs.next());
+            assertEquals(0, rs.getLong(1));
+        }
+    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
index d319b4d..aca1602 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.end2end;
 
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
 import static org.apache.phoenix.util.PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB;
 import static org.apache.phoenix.util.TestUtil.A_VALUE;
 import static org.apache.phoenix.util.TestUtil.B_VALUE;
@@ -33,6 +34,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.math.BigDecimal;
 import java.sql.Connection;
@@ -40,9 +42,13 @@ import java.sql.Date;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.types.PInteger;
@@ -800,4 +806,529 @@ public class UpsertSelectIT extends BaseClientManagedTimeIT {
         conn.close();
 
     }
+    
+    @Test
+    public void testUpsertSelectWithRowtimeStampColumn() throws Exception {
+        long ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            conn.createStatement().execute("CREATE TABLE T1 (PK1 VARCHAR NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 DESC ROW_TIMESTAMP " + ")) ");
+            conn.createStatement().execute("CREATE TABLE T2 (PK1 VARCHAR NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 ROW_TIMESTAMP)) ");
+            conn.createStatement().execute("CREATE TABLE T3 (PK1 VARCHAR NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 DESC ROW_TIMESTAMP " + ")) ");
+        }
+        
+        // Upsert data with scn set on the connection. However, the timestamp of the put will be the value of the row_timestamp column.
+        ts = nextTimestamp();
+        long rowTimestamp = 1000000;
+        Date rowTimestampDate = new Date(rowTimestamp);
+        try (Connection conn = getConnection(ts)) {
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO T1 (PK1, PK2, KV1) VALUES(?, ?, ?)");
+            stmt.setString(1, "PK1");
+            stmt.setDate(2, rowTimestampDate);
+            stmt.setString(3, "KV1");
+            stmt.executeUpdate();
+            conn.commit();
+        }
+        
+        // Upsert select data into table T2. The connection needs to be at a timestamp beyond the row timestamp. Otherwise 
+        // it won't see the data from table T1.
+        try (Connection conn = getConnection(rowTimestamp + 5)) {
+            conn.createStatement().executeUpdate("UPSERT INTO T2 SELECT * FROM T1");
+            conn.commit();
+            // Verify the data upserted in T2. Note that we can use the same connection here because the data was
+            // inserted with a timestamp of rowTimestamp and the connection is at rowTimestamp + 5.
+            PreparedStatement stmt = conn.prepareStatement("SELECT * FROM T2 WHERE PK1 = ? AND PK2 = ?");
+            stmt.setString(1, "PK1");
+            stmt.setDate(2, rowTimestampDate);
+            ResultSet rs = stmt.executeQuery();
+            assertTrue(rs.next());
+            assertEquals("PK1", rs.getString("PK1"));
+            assertEquals(rowTimestampDate, rs.getDate("PK2"));
+            assertEquals("KV1", rs.getString("KV1"));
+        }
+        
+        // Verify that you can't see the data in T2 if the connection is at next timestamp (which is lower than the row timestamp).
+        try (Connection conn = getConnection(nextTimestamp())) {
+            PreparedStatement stmt = conn.prepareStatement("SELECT * FROM T2 WHERE PK1 = ? AND PK2 = ?");
+            stmt.setString(1, "PK1");
+            stmt.setDate(2, rowTimestampDate);
+            ResultSet rs = stmt.executeQuery();
+            assertFalse(rs.next());
+        }
+        
+        // Upsert select data into table T3. The connection needs to be at a timestamp beyond the row timestamp. Otherwise 
+        // it won't see the data from table T1.
+        try (Connection conn = getConnection(rowTimestamp + 5)) {
+            conn.createStatement().executeUpdate("UPSERT INTO T3 SELECT * FROM T2");
+            conn.commit();
+            // Verify the data upserted in T3. Note that we can use the same connection here because the data was
+            // inserted with a timestamp of rowTimestamp and the connection is at rowTimestamp + 5.
+            PreparedStatement stmt = conn.prepareStatement("SELECT * FROM T3 WHERE PK1 = ? AND PK2 = ?");
+            stmt.setString(1, "PK1");
+            stmt.setDate(2, rowTimestampDate);
+            ResultSet rs = stmt.executeQuery();
+            assertTrue(rs.next());
+            assertEquals("PK1", rs.getString("PK1"));
+            assertEquals(rowTimestampDate, rs.getDate("PK2"));
+            assertEquals("KV1", rs.getString("KV1"));
+        }
+        
+        // Verify that you can't see the data in T2 if the connection is at next timestamp (which is lower than the row timestamp).
+        try (Connection conn = getConnection(nextTimestamp())) {
+            PreparedStatement stmt = conn.prepareStatement("SELECT * FROM T3 WHERE PK1 = ? AND PK2 = ?");
+            stmt.setString(1, "PK1");
+            stmt.setDate(2, rowTimestampDate);
+            ResultSet rs = stmt.executeQuery();
+            assertFalse(rs.next());
+        }
+    }
+    
+    @Test
+    public void testUpsertSelectSameTableWithRowTimestampColumn() throws Exception {
+        String tableName = "testUpsertSelectSameTableWithRowTimestampColumn".toUpperCase();
+        long ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            conn.createStatement().execute("CREATE TABLE " + tableName + " (PK1 INTEGER NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 ROW_TIMESTAMP)) ");
+        }
+
+        // Upsert data with scn set on the connection. The timestamp of the put will be the value of the row_timestamp column.
+        ts = nextTimestamp();
+        long rowTimestamp = ts + 100000;
+        Date rowTimestampDate = new Date(rowTimestamp);
+        try (Connection conn = getConnection(ts)) {
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO  " + tableName + " (PK1, PK2, KV1) VALUES(?, ?, ?)");
+            stmt.setInt(1, 1);
+            stmt.setDate(2, rowTimestampDate);
+            stmt.setString(3, "KV1");
+            stmt.executeUpdate();
+            conn.commit();
+        }
+
+        try (Connection conn = getConnection(nextTimestamp())) {
+            conn.createStatement().execute("CREATE SEQUENCE T_SEQ");
+        }
+        // Upsert select data into table. The connection needs to be at a timestamp beyond the row timestamp. Otherwise 
+        // it won't see the data from table.
+        try (Connection conn = getConnection(rowTimestamp + 5)) {
+            conn.createStatement().executeUpdate("UPSERT INTO  " + tableName + "  SELECT NEXT VALUE FOR T_SEQ, PK2 FROM  " + tableName);
+            conn.commit();
+        }
+        
+        // Upsert select using sequences.
+        try (Connection conn = getConnection(rowTimestamp + 5)) {
+            conn.setAutoCommit(true);
+            for (int i = 0; i < 10; i++) {
+                int count = conn.createStatement().executeUpdate("UPSERT INTO  " + tableName + "  SELECT NEXT VALUE FOR T_SEQ, PK2 FROM  " + tableName);
+                assertEquals((int)Math.pow(2, i), count);
+            }
+        }
+    }
+    
+    @Test
+    public void testAutomaticallySettingRowtimestamp() throws Exception {
+        String table1 = "testAutomaticallySettingRowtimestamp1".toUpperCase();
+        String table2 = "testAutomaticallySettingRowtimestamp2".toUpperCase();
+        String table3 = "testAutomaticallySettingRowtimestamp3".toUpperCase();
+        long ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            conn.createStatement().execute("CREATE TABLE " + table1 + " (T1PK1 VARCHAR NOT NULL, T1PK2 DATE NOT NULL, T1KV1 VARCHAR, T1KV2 VARCHAR CONSTRAINT PK PRIMARY KEY(T1PK1, T1PK2 DESC ROW_TIMESTAMP)) ");
+            conn.createStatement().execute("CREATE TABLE " + table2 + " (T2PK1 VARCHAR NOT NULL, T2PK2 DATE NOT NULL, T2KV1 VARCHAR, T2KV2 VARCHAR CONSTRAINT PK PRIMARY KEY(T2PK1, T2PK2 ROW_TIMESTAMP)) ");
+            conn.createStatement().execute("CREATE TABLE " + table3 + " (T3PK1 VARCHAR NOT NULL, T3PK2 DATE NOT NULL, T3KV1 VARCHAR, T3KV2 VARCHAR CONSTRAINT PK PRIMARY KEY(T3PK1, T3PK2 DESC ROW_TIMESTAMP)) ");
+        }
+        ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            // Upsert values where row_timestamp column PK2 is not set and the column names are specified
+            // This should upsert data with the value for PK2 as new Date(ts);
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO  " + table1 + " (T1PK1, T1KV1, T1KV2) VALUES (?, ?, ?)");
+            stmt.setString(1, "PK1");
+            stmt.setString(2, "KV1");
+            stmt.setString(3, "KV2");
+            stmt.executeUpdate();
+            conn.commit();
+        }
+        Date upsertedDate = new Date(ts);
+        ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            // Now query for data that was upserted above. If the row key was generated correctly then we should be able to see
+            // the data in this query.
+            PreparedStatement stmt = conn.prepareStatement("SELECT T1KV1, T1KV2 FROM " + table1 + " WHERE T1PK1 = ? AND T1PK2 = ?");
+            stmt.setString(1, "PK1");
+            stmt.setDate(2, upsertedDate);
+            ResultSet rs = stmt.executeQuery();
+            assertTrue(rs.next());
+            assertEquals("KV1", rs.getString(1));
+            assertEquals("KV2", rs.getString(2));
+            assertFalse(rs.next());
+        }
+        
+        ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            // Upsert select into table2 by not selecting the row timestamp column. In this case, the rowtimestamp column would end up being set to the scn of the connection
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO  " + table2 + " (T2PK1, T2KV1, T2KV2) SELECT T1PK1, T1KV1, T1KV2 FROM " + table1);
+            stmt.executeUpdate();
+            conn.commit();
+        }
+        
+        upsertedDate = new Date(ts);
+        ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            // Now query for data that was upserted above. If the row key was generated correctly then we should be able to see
+            // the data in this query.
+            PreparedStatement stmt = conn.prepareStatement("SELECT T2KV1, T2KV2 FROM " + table2 + " WHERE T2PK1 = ? AND T2PK2 = ?");
+            stmt.setString(1, "PK1");
+            stmt.setDate(2, upsertedDate);
+            ResultSet rs = stmt.executeQuery();
+            assertTrue(rs.next());
+            assertEquals("KV1", rs.getString(1));
+            assertEquals("KV2", rs.getString(2));
+            assertFalse(rs.next());
+        }
+        
+        ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            // Upsert select into table2 by not selecting the row timestamp column. In this case, the rowtimestamp column would end up being set to the scn of the connection
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO  " + table3 + " (T3PK1, T3KV1, T3KV2) SELECT T2PK1, T2KV1, T2KV2 FROM " + table2);
+            stmt.executeUpdate();
+            conn.commit();
+        }
+        
+        upsertedDate = new Date(ts);
+        ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            // Now query for data that was upserted above. If the row key was generated correctly then we should be able to see
+            // the data in this query.
+            PreparedStatement stmt = conn.prepareStatement("SELECT T3KV1, T3KV2 FROM " + table3 + " WHERE T3PK1 = ? AND T3PK2 = ?");
+            stmt.setString(1, "PK1");
+            stmt.setDate(2, upsertedDate);
+            ResultSet rs = stmt.executeQuery();
+            assertTrue(rs.next());
+            assertEquals("KV1", rs.getString(1));
+            assertEquals("KV2", rs.getString(2));
+            assertFalse(rs.next());
+        }
+    }
+    
+    @Test
+    public void testUpsertSelectAutoCommitWithRowTimestampColumn() throws Exception {
+        String tableName1 = "testUpsertSelectServerSideWithRowTimestampColumn".toUpperCase();
+        String tableName2 = "testUpsertSelectServerSideWithRowTimestampColumn2".toUpperCase();
+        long ts = 10;
+        try (Connection conn = getConnection(ts)) {
+            conn.createStatement().execute("CREATE TABLE " + tableName1 + " (PK1 INTEGER NOT NULL, PK2 DATE NOT NULL, PK3 INTEGER NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 ROW_TIMESTAMP, PK3)) ");
+            conn.createStatement().execute("CREATE TABLE " + tableName2 + " (PK1 INTEGER NOT NULL, PK2 DATE NOT NULL, PK3 INTEGER NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 DESC ROW_TIMESTAMP, PK3)) ");
+        }
+
+        String[] tableNames = {tableName1, tableName2};
+        for (String tableName : tableNames) {
+            // Upsert data with scn set on the connection. The timestamp of the put will be the value of the row_timestamp column.
+            long rowTimestamp1 = 100;
+            Date rowTimestampDate = new Date(rowTimestamp1);
+            try (Connection conn = getConnection(ts)) {
+                PreparedStatement stmt = conn.prepareStatement("UPSERT INTO  " + tableName + " (PK1, PK2, PK3, KV1) VALUES(?, ?, ?, ?)");
+                stmt.setInt(1, 1);
+                stmt.setDate(2, rowTimestampDate);
+                stmt.setInt(3, 3);
+                stmt.setString(4, "KV1");
+                stmt.executeUpdate();
+                conn.commit();
+            }
+
+            long rowTimestamp2 = 2 * rowTimestamp1;
+            try (Connection conn = getConnection(rowTimestamp2)) {
+                conn.setAutoCommit(true);
+                // Upsert select in the same table with the row_timestamp column PK2 not specified. This will end up
+                // creating a new row whose timestamp is the SCN of the connection. The same SCN will be used
+                // for the row key too.
+                conn.createStatement().executeUpdate("UPSERT INTO  " + tableName + " (PK1, PK3, KV1) SELECT PK1, PK3, KV1 FROM  " + tableName);
+            }
+            try (Connection conn = getConnection(3 * rowTimestamp1)) {
+                // Verify the row that was upserted above
+                PreparedStatement stmt = conn.prepareStatement("SELECT * FROM  " + tableName + " WHERE PK1 = ? AND PK2 = ? AND PK3 = ?");
+                stmt.setInt(1, 1);
+                stmt.setDate(2, new Date(rowTimestamp2));
+                stmt.setInt(3, 3);
+                ResultSet rs = stmt.executeQuery();
+                assertTrue(rs.next());
+                assertEquals(1, rs.getInt("PK1"));
+                assertEquals(3, rs.getInt("PK3"));
+                assertEquals("KV1", rs.getString("KV1"));
+                assertEquals(new Date(rowTimestamp2), rs.getDate("PK2"));
+                assertFalse(rs.next());
+                // Number of rows in the table should be 2.
+                rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName);
+                assertTrue(rs.next());
+                assertEquals(2, rs.getInt(1));
+
+            }
+            ts = 5 * rowTimestamp1;
+            try (Connection conn = getConnection(ts)) {
+                conn.setAutoCommit(true);
+                // Upsert select in the same table with the row_timestamp column PK2 specified. This will not end up creating a new row
+                // because the destination pk columns, including the row timestamp column PK2, are the same as the source column.
+                conn.createStatement().executeUpdate("UPSERT INTO  " + tableName + " (PK1, PK2, PK3, KV1) SELECT PK1, PK2, PK3, KV1 FROM  " + tableName);
+            }
+            ts = 6 * rowTimestamp1;
+            try (Connection conn = getConnection(ts)) {
+                // Verify that two rows were created. One with rowtimestamp1 and the other with rowtimestamp2
+                ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName);
+                assertTrue(rs.next());
+                assertEquals(2, rs.getInt(1));
+                assertFalse(rs.next());
+            }
+            
+        }
+    }
+    
+    @Test
+    public void testRowTimestampColWithViewsIndexesAndSaltedTables() throws Exception {
+        String baseTable = "testRowTimestampColWithViewsIndexesAndSaltedTables".toUpperCase();
+        String tenantView = "tenatView".toUpperCase();
+        String globalView = "globalView".toUpperCase();
+        String baseTableIdx = "table_idx".toUpperCase();
+        String tenantViewIdx = "tenantView_idx".toUpperCase();
+
+        long ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            conn.createStatement().execute("CREATE TABLE " + baseTable + " (TENANT_ID CHAR(15) NOT NULL, PK2 DATE NOT NULL, PK3 INTEGER NOT NULL, KV1 VARCHAR, KV2 VARCHAR, KV3 VARCHAR CONSTRAINT PK PRIMARY KEY(TENANT_ID, PK2 ROW_TIMESTAMP, PK3)) MULTI_TENANT = true, SALT_BUCKETS = 8");
+        }
+        ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            conn.createStatement().execute("CREATE INDEX " + baseTableIdx + " ON " + baseTable + " (PK2, KV3) INCLUDE (KV1)");
+        }
+        ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            conn.createStatement().execute("CREATE VIEW " + globalView + " AS SELECT * FROM " + baseTable + " WHERE KV1 = 'KV1'");
+        }
+        String tenantId = "tenant1";
+        ts = nextTimestamp();
+        try (Connection conn = getTenantConnection(tenantId, ts)) {
+            conn.createStatement().execute("CREATE VIEW " + tenantView + " AS SELECT * FROM " + baseTable);
+        }
+        ts = nextTimestamp();
+        try (Connection conn = getTenantConnection(tenantId, ts)) {
+            conn.createStatement().execute("CREATE INDEX " + tenantViewIdx + " ON " + tenantView + " (PK2, KV2) INCLUDE (KV1)");
+        }
+
+        // upsert data into base table without specifying the row timestamp column PK2
+        long upsertedTs = 5;
+        try (Connection conn = getConnection(upsertedTs)) {
+            // Upsert select in the same table with the row_timestamp column PK2 not specified. This will end up
+            // creating a new row whose timestamp is the SCN of the connection. The same SCN will be used
+            // for the row key too.
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO  " + baseTable + " (TENANT_ID, PK3, KV1, KV2, KV3) VALUES (?, ?, ?, ?, ?)");
+            stmt.setString(1, tenantId);
+            stmt.setInt(2, 3);
+            stmt.setString(3, "KV1");
+            stmt.setString(4, "KV2");
+            stmt.setString(5, "KV3");
+            stmt.executeUpdate();
+            conn.commit();
+        }
+
+        // Verify that we can see data when querying through base table, global view and index on the base table
+        try (Connection conn = getConnection(nextTimestamp())) {
+            // Query the base table
+            PreparedStatement stmt = conn.prepareStatement("SELECT * FROM  " + baseTable + " WHERE TENANT_ID = ? AND PK2 = ? AND PK3 = ?");
+            stmt.setString(1, tenantId);
+            stmt.setDate(2, new Date(upsertedTs));
+            stmt.setInt(3, 3);
+            ResultSet rs = stmt.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(tenantId, rs.getString("TENANT_ID"));
+            assertEquals("KV1", rs.getString("KV1"));
+            assertEquals("KV2", rs.getString("KV2"));
+            assertEquals("KV3", rs.getString("KV3"));
+            assertEquals(new Date(upsertedTs), rs.getDate("PK2"));
+            assertFalse(rs.next());
+
+            // Query the globalView
+            stmt = conn.prepareStatement("SELECT * FROM  " + globalView + " WHERE TENANT_ID = ? AND PK2 = ? AND PK3 = ?");
+            stmt.setString(1, tenantId);
+            stmt.setDate(2, new Date(upsertedTs));
+            stmt.setInt(3, 3);
+            rs = stmt.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(tenantId, rs.getString("TENANT_ID"));
+            assertEquals("KV1", rs.getString("KV1"));
+            assertEquals("KV2", rs.getString("KV2"));
+            assertEquals("KV3", rs.getString("KV3"));
+            assertEquals(new Date(upsertedTs), rs.getDate("PK2"));
+            assertFalse(rs.next());
+
+            // Query using the index on base table
+            stmt = conn.prepareStatement("SELECT KV1 FROM  " + baseTable + " WHERE PK2 = ? AND KV3 = ?");
+            stmt.setDate(1, new Date(upsertedTs));
+            stmt.setString(2, "KV3");
+            rs = stmt.executeQuery();
+            QueryPlan plan = stmt.unwrap(PhoenixStatement.class).getQueryPlan();
+            assertTrue(plan.getTableRef().getTable().getName().getString().equals(baseTableIdx));
+            assertTrue(rs.next());
+            assertEquals("KV1", rs.getString("KV1"));
+            assertFalse(rs.next());
+        }
+
+        // Verify that data can be queried using tenant view and tenant view index
+        try (Connection tenantConn = getTenantConnection(tenantId, nextTimestamp())) {
+            // Query the tenant view
+            PreparedStatement stmt = tenantConn.prepareStatement("SELECT * FROM  " + tenantView + " WHERE PK2 = ? AND PK3 = ?");
+            stmt.setDate(1, new Date(upsertedTs));
+            stmt.setInt(2, 3);
+            ResultSet rs = stmt.executeQuery();
+            assertTrue(rs.next());
+            assertEquals("KV1", rs.getString("KV1"));
+            assertEquals("KV2", rs.getString("KV2"));
+            assertEquals("KV3", rs.getString("KV3"));
+            assertEquals(new Date(upsertedTs), rs.getDate("PK2"));
+            assertFalse(rs.next());
+
+            // Query using the index on the tenantView
+            //TODO: uncomment the code after PHOENIX-2277 is fixed
+//            stmt = tenantConn.prepareStatement("SELECT KV1 FROM  " + tenantView + " WHERE PK2 = ? AND KV2 = ?");
+//            stmt.setDate(1, new Date(upsertedTs));
+//            stmt.setString(2, "KV2");
+//            rs = stmt.executeQuery();
+//            QueryPlan plan = stmt.unwrap(PhoenixStatement.class).getQueryPlan();
+//            assertTrue(plan.getTableRef().getTable().getName().getString().equals(tenantViewIdx));
+//            assertTrue(rs.next());
+//            assertEquals("KV1", rs.getString("KV1"));
+//            assertFalse(rs.next());
+        }
+
+        upsertedTs = nextTimestamp();
+        try (Connection tenantConn = getTenantConnection(tenantId, upsertedTs)) {
+            // Upsert into tenant view where the row_timestamp column PK2 is not specified
+            PreparedStatement stmt = tenantConn.prepareStatement("UPSERT INTO  " + tenantView + " (PK3, KV1, KV2, KV3) VALUES (?, ?, ?, ?)");
+            stmt.setInt(1, 33);
+            stmt.setString(2, "KV13");
+            stmt.setString(3, "KV23");
+            stmt.setString(4, "KV33");
+            stmt.executeUpdate();
+            tenantConn.commit();
+            // Upsert into tenant view where the row_timestamp column PK2 is specified
+            stmt = tenantConn.prepareStatement("UPSERT INTO  " + tenantView + " (PK2, PK3, KV1, KV2, KV3) VALUES (?, ?, ?, ?, ?)");
+            stmt.setDate(1, new Date(upsertedTs));
+            stmt.setInt(2, 44);
+            stmt.setString(3, "KV14");
+            stmt.setString(4, "KV24");
+            stmt.setString(5, "KV34");
+            stmt.executeUpdate();
+            tenantConn.commit();
+        }
+
+        // Verify that the data upserted using the tenant view can now be queried using base table and the base table index
+        try (Connection conn = getConnection(upsertedTs + 10000)) {
+            // Query the base table
+            PreparedStatement stmt = conn.prepareStatement("SELECT * FROM  " + baseTable + " WHERE TENANT_ID = ? AND PK2 = ? AND PK3 = ? ");
+            stmt.setString(1, tenantId);
+            stmt.setDate(2, new Date(upsertedTs));
+            stmt.setInt(3, 33);
+            ResultSet rs = stmt.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(tenantId, rs.getString("TENANT_ID"));
+            assertEquals("KV13", rs.getString("KV1"));
+            assertEquals("KV23", rs.getString("KV2"));
+            assertEquals("KV33", rs.getString("KV3"));
+            assertFalse(rs.next());
+            
+            stmt = conn.prepareStatement("SELECT * FROM  " + baseTable + " WHERE TENANT_ID = ? AND PK2 = ? AND PK3 = ? ");
+            stmt.setString(1, tenantId);
+            stmt.setDate(2, new Date(upsertedTs));
+            stmt.setInt(3, 44);
+            rs = stmt.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(tenantId, rs.getString("TENANT_ID"));
+            assertEquals("KV14", rs.getString("KV1"));
+            assertEquals("KV24", rs.getString("KV2"));
+            assertEquals("KV34", rs.getString("KV3"));
+            assertFalse(rs.next());
+
+            // Query using the index on base table
+            stmt = conn.prepareStatement("SELECT KV1 FROM  " + baseTable + " WHERE (PK2, KV3) IN ((?, ?), (?, ?)) ORDER BY KV1");
+            stmt.setDate(1, new Date(upsertedTs));
+            stmt.setString(2, "KV33");
+            stmt.setDate(3, new Date(upsertedTs));
+            stmt.setString(4, "KV34");
+            rs = stmt.executeQuery();
+            QueryPlan plan = stmt.unwrap(PhoenixStatement.class).getQueryPlan();
+            assertTrue(plan.getTableRef().getTable().getName().getString().equals(baseTableIdx));
+            assertTrue(rs.next());
+            assertEquals("KV13", rs.getString("KV1"));
+            assertTrue(rs.next());
+            assertEquals("KV14", rs.getString("KV1"));
+            assertFalse(rs.next());
+        }
+        
+        // Verify that the data upserted using the tenant view can now be queried using tenant view
+        try (Connection tenantConn = getTenantConnection(tenantId, upsertedTs + 10000)) {
+            // Query the base table
+            PreparedStatement stmt = tenantConn.prepareStatement("SELECT * FROM  " + tenantView + " WHERE (PK2, PK3) IN ((?, ?), (?, ?)) ORDER BY KV1");
+            stmt.setDate(1, new Date(upsertedTs));
+            stmt.setInt(2, 33);
+            stmt.setDate(3, new Date(upsertedTs));
+            stmt.setInt(4, 44);
+            ResultSet rs = stmt.executeQuery();
+            assertTrue(rs.next());
+            assertEquals("KV13", rs.getString("KV1"));
+            assertTrue(rs.next());
+            assertEquals("KV14", rs.getString("KV1"));
+            assertFalse(rs.next());
+            
+            //TODO: uncomment the code after PHOENIX-2277 is fixed
+//            // Query using the index on the tenantView
+//            stmt = tenantConn.prepareStatement("SELECT KV1 FROM  " + tenantView + " WHERE (PK2, KV2) IN (?, ?, ?, ?) ORDER BY KV1");
+//            stmt.setDate(1, new Date(upsertedTs));
+//            stmt.setString(2, "KV23");
+//            stmt.setDate(3, new Date(upsertedTs));
+//            stmt.setString(4, "KV24");
+//            rs = stmt.executeQuery();
+//            QueryPlan plan = stmt.unwrap(PhoenixStatement.class).getQueryPlan();
+//            assertTrue(plan.getTableRef().getTable().getName().getString().equals(tenantViewIdx));
+//            assertTrue(rs.next());
+//            assertEquals("KV13", rs.getString("KV1"));
+//            assertTrue(rs.next());
+//            assertEquals("KV14", rs.getString("KV1"));
+//            assertFalse(rs.next());
+        }
+    }
+        
+    @Test
+    public void testDisallowNegativeValuesForRowTsColumn() throws Exception {
+        String tableName = "testDisallowNegativeValuesForRowTsColumn".toUpperCase();
+        String tableName2 = "testDisallowNegativeValuesForRowTsColumn2".toUpperCase();
+        long ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            conn.createStatement().execute("CREATE TABLE " + tableName + " (PK1 BIGINT NOT NULL PRIMARY KEY ROW_TIMESTAMP, KV1 VARCHAR)");
+            conn.createStatement().execute("CREATE TABLE " + tableName2 + " (PK1 BIGINT NOT NULL PRIMARY KEY ROW_TIMESTAMP, KV1 VARCHAR)");
+        }
+        ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            long upsertedTs = 100;
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName +  " VALUES (?, ?)");
+            stmt.setLong(1, upsertedTs);
+            stmt.setString(2, "KV1");
+            stmt.executeUpdate();
+            conn.commit();
+        }
+        ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName2 +  " SELECT (PK1 - 500), KV1 FROM " + tableName);
+            stmt.executeUpdate();
+            fail();
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.ILLEGAL_DATA.getErrorCode(), e.getErrorCode());
+        }
+    }
+    
+    private static Connection getConnection(long ts) throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
+        return DriverManager.getConnection(getUrl(), props);
+    }
+    
+    private static Connection getTenantConnection(String tenantId, long ts) throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
+        props.setProperty(TENANT_ID_ATTRIB, tenantId);
+        return DriverManager.getConnection(getUrl(), props);
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
index 8e07af5..78dce80 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
@@ -37,9 +37,12 @@ import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.Properties;
 
+import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.Test;
 
@@ -644,4 +647,285 @@ public class UpsertValuesIT extends BaseClientManagedTimeIT {
         }
     }
     
+    @Test
+    public void testWithUpsertingRowTimestampColSpecified_desc() throws Exception {
+        testWithUpsertingRowTimestampColSpecified(true);
+    }
+    
+    @Test
+    public void testWithUpsertingRowTimestampColSpecified_asc() throws Exception {
+        testWithUpsertingRowTimestampColSpecified(false);
+    }
+    
+    private void testWithUpsertingRowTimestampColSpecified(boolean desc) throws Exception {
+        String tableName = "testUpsertingRowTimestampCol".toUpperCase();
+        String indexName = "testUpsertingRowTimestampCol_idx".toUpperCase();
+        long ts = nextTimestamp();
+        String sortOrder = desc ? "DESC" : "";
+        try (Connection conn = getConnection(ts)) {
+            conn.createStatement().execute("CREATE TABLE IF NOT EXISTS " + tableName + " (PK1 VARCHAR NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR, KV2 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 "+ sortOrder + " ROW_TIMESTAMP " + ")) ");
+        }
+        ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            conn.createStatement().execute("CREATE INDEX IF NOT EXISTS " + indexName + " ON  " + tableName + "  (PK2, KV1) INCLUDE (KV2)");
+        }
+        ts = nextTimestamp();
+        long rowTimestamp = ts + 10000;
+        Date rowTimestampDate = new Date(rowTimestamp);
+        try (Connection conn = getConnection(ts)) {
+            // Upsert data with scn set on the connection. However, the timestamp of the put will be the value of the row_timestamp column.
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO  " + tableName + "  (PK1, PK2, KV1, KV2) VALUES (?, ?, ?, ?)");
+            stmt.setString(1, "PK1");
+            stmt.setDate(2, rowTimestampDate);
+            stmt.setString(3, "KV1");
+            stmt.setString(4, "KV2");
+            stmt.executeUpdate();
+            conn.commit();
+        }
+        ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            // Verify that the connection with the next time stamp isn't able to see the data inserted above. This 
+            // is because the timestamp of the put was rowTimestamp and not connection scn.
+            PreparedStatement stmt = conn.prepareStatement("SELECT * FROM  " + tableName + "  WHERE PK1 = ? AND PK2 = ?");
+            stmt.setString(1, "PK1");
+            stmt.setDate(2, rowTimestampDate);
+            ResultSet rs = stmt.executeQuery();
+            assertFalse(rs.next());
+            
+            // Same holds when querying the index table too
+            stmt = conn.prepareStatement("SELECT KV1 FROM  " + tableName + "  WHERE PK2 = ?");
+            stmt.setDate(1, rowTimestampDate);
+            rs = stmt.executeQuery();
+            assertFalse(rs.next());
+        }
+
+        // Verify now that if the connection is at an SCN beyond the rowtimestamp then we can indeed see the
+        // data that we upserted above.
+        try (Connection conn = getConnection(rowTimestamp + 5)) {
+            PreparedStatement stmt = conn.prepareStatement("SELECT * FROM  " + tableName + "  WHERE PK1 = ? AND PK2 = ?");
+            stmt.setString(1, "PK1");
+            stmt.setDate(2, rowTimestampDate);
+            ResultSet rs = stmt.executeQuery();
+            assertTrue(rs.next());
+            assertEquals("PK1", rs.getString("PK1"));
+            assertEquals(rowTimestampDate, rs.getDate("PK2"));
+            assertEquals("KV1", rs.getString("KV1"));
+            
+            // Data visible when querying the index table too.
+            stmt = conn.prepareStatement("SELECT KV2 FROM  " + tableName + "  WHERE PK2 = ? AND KV1 = ?");
+            stmt.setDate(1, rowTimestampDate);
+            stmt.setString(2, "KV1");
+            rs = stmt.executeQuery();
+            QueryPlan plan = stmt.unwrap(PhoenixStatement.class).getQueryPlan();
+            assertTrue(plan.getTableRef().getTable().getName().getString().equals(indexName));
+            assertTrue(rs.next());
+            assertEquals("KV2", rs.getString("KV2"));
+        }
+    }
+    
+    @Test
+    public void testAutomaticallySettingRowtimestamp_desc() throws Exception {
+        testAutomaticallySettingRowtimestamp("DESC");
+    }
+    
+    @Test
+    public void testAutomaticallySettingRowtimestamp_asc() throws Exception {
+        testAutomaticallySettingRowtimestamp("ASC");
+    }
+    
+    private void testAutomaticallySettingRowtimestamp(String sortOrder) throws Exception {
+        String tableName = "testAutomaticallySettingRowtimestamp".toUpperCase();
+        String indexName = "testAutomaticallySettingRowtimestamp_index".toUpperCase();
+        long ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            conn.createStatement().execute("CREATE TABLE IF NOT EXISTS " + tableName + " (PK1 VARCHAR NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR, KV2 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 "+ sortOrder + " ROW_TIMESTAMP " + ")) ");
+        }
+        ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            conn.createStatement().execute("CREATE INDEX IF NOT EXISTS " + indexName + " ON  " + tableName + "  (PK2, KV1) INCLUDE (KV2)");
+        }
+        ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            // Upsert values where row_timestamp column PK2 is not set and the column names are specified
+            // This should upsert data with the value for PK2 as new Date(ts);
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO  " + tableName + " (PK1, KV1, KV2) VALUES (?, ?, ?)");
+            stmt.setString(1, "PK1");
+            stmt.setString(2, "KV1");
+            stmt.setString(3, "KV2");
+            stmt.executeUpdate();
+            conn.commit();
+        }
+        Date upsertedDate = new Date(ts);
+        ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            // Now query for data that was upserted above. If the row key was generated correctly then we should be able to see
+            // the data in this query.
+            PreparedStatement stmt = conn.prepareStatement("SELECT KV1, KV2 FROM " + tableName + " WHERE PK1 = ? AND PK2 = ?");
+            stmt.setString(1, "PK1");
+            stmt.setDate(2, upsertedDate);
+            ResultSet rs = stmt.executeQuery();
+            assertTrue(rs.next());
+            assertEquals("KV1", rs.getString(1));
+            assertEquals("KV2", rs.getString(2));
+            assertFalse(rs.next());
+            
+            // Verify now that the data was correctly added to the mutable index too.
+            stmt = conn.prepareStatement("SELECT KV2 FROM " + tableName + " WHERE PK2 = ? AND KV1 = ?");
+            stmt.setDate(1, upsertedDate);
+            stmt.setString(2, "KV1");
+            rs = stmt.executeQuery();
+            QueryPlan plan = stmt.unwrap(PhoenixStatement.class).getQueryPlan();
+            assertTrue(plan.getTableRef().getTable().getName().getString().equals(indexName));
+            assertTrue(rs.next());
+            assertEquals("KV2", rs.getString(1));
+            assertFalse(rs.next());
+        }
+    }
+    
+    @Test
+    public void testAutomaticallySettingRowTimestampForImmutableTableAndIndexes_desc() throws Exception {
+        testAutomaticallySettingRowTimestampForImmutableTableAndIndexes("DESC");
+    }
+    
+    @Test
+    public void testAutomaticallySettingRowTimestampForImmutableTableAndIndexes_asc() throws Exception {
+        testAutomaticallySettingRowTimestampForImmutableTableAndIndexes("ASC");
+    }
+    
+    private void testAutomaticallySettingRowTimestampForImmutableTableAndIndexes(String sortOrder) throws Exception {
+        String tableName = "testSettingRowTimestampForImmutableTableAndIndexes".toUpperCase();
+        String indexName = "testSettingRowTimestampForImmutableTableAndIndexes_index".toUpperCase();
+        long ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            conn.createStatement().execute("CREATE TABLE IF NOT EXISTS " + tableName + " (PK1 VARCHAR NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR, KV2 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 "+ sortOrder + " ROW_TIMESTAMP)) " + "  IMMUTABLE_ROWS=true");
+        }
+        ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            conn.createStatement().execute("CREATE INDEX IF NOT EXISTS " + indexName + " ON  " + tableName + "  (PK2, KV1) INCLUDE (KV2)");
+        }
+        ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            // Upsert values where row_timestamp column PK2 is not set and the column names are specified
+            // This should upsert data with the value for PK2 as new Date(ts);
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO  " + tableName + " (PK1, KV1, KV2) VALUES (?, ?, ?)");
+            stmt.setString(1, "PK1");
+            stmt.setString(2, "KV1");
+            stmt.setString(3, "KV2");
+            stmt.executeUpdate();
+            conn.commit();
+        }
+        Date upsertedDate = new Date(ts);
+        ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            // Now query for data that was upserted above. If the row key was generated correctly then we should be able to see
+            // the data in this query.
+            PreparedStatement stmt = conn.prepareStatement("SELECT KV1, KV2 FROM " + tableName + " WHERE PK1 = ? AND PK2 = ?");
+            stmt.setString(1, "PK1");
+            stmt.setDate(2, upsertedDate);
+            ResultSet rs = stmt.executeQuery();
+            assertTrue(rs.next());
+            assertEquals("KV1", rs.getString(1));
+            assertEquals("KV2", rs.getString(2));
+            assertFalse(rs.next());
+            
+            // Verify now that the data was correctly added to the mutable index too.
+            stmt = conn.prepareStatement("SELECT KV2 FROM " + tableName + " WHERE PK2 = ? AND KV1 = ?");
+            stmt.setDate(1, upsertedDate);
+            stmt.setString(2, "KV1");
+            rs = stmt.executeQuery();
+            QueryPlan plan = stmt.unwrap(PhoenixStatement.class).getQueryPlan();
+            assertTrue(plan.getTableRef().getTable().getName().getString().equals(indexName));
+            assertTrue(rs.next());
+            assertEquals("KV2", rs.getString(1));
+            assertFalse(rs.next());
+        }
+    }
+    
+    @Test
+    public void testComparisonOperatorsOnAscRowTimestampCol() throws Exception {
+        testComparisonOperatorsOnRowTimestampCol("ASC");
+    }
+    
+    @Test
+    public void testComparisonOperatorsOnDescRowTimestampCol() throws Exception {
+        testComparisonOperatorsOnRowTimestampCol("DESC");
+    }
+    
+    private void testComparisonOperatorsOnRowTimestampCol(String sortOrder) throws Exception {
+        String tableName = ("testComparisonOperatorsOnRowTimestampCol_" + sortOrder).toUpperCase();
+        long ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            conn.createStatement().execute("CREATE TABLE IF NOT EXISTS " + tableName + " (PK1 VARCHAR NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 "+ sortOrder + " ROW_TIMESTAMP)) " + "  IMMUTABLE_ROWS=true");
+        }
+        ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            String upsert = "UPSERT INTO " + tableName + " VALUES (?, ?, ?)";
+            PreparedStatement stmt = conn.prepareStatement(upsert);
+            stmt.setString(1, "a");
+            stmt.setDate(2, new Date(10));
+            stmt.setString(3, "KV");
+            stmt.executeUpdate();
+            stmt.setString(1, "b");
+            stmt.setDate(2, new Date(20));
+            stmt.setString(3, "KV");
+            stmt.executeUpdate();
+            stmt.setString(1, "c");
+            stmt.setDate(2, new Date(30));
+            stmt.setString(3, "KV");
+            stmt.executeUpdate();
+            stmt.setString(1, "d");
+            stmt.setDate(2, new Date(40));
+            stmt.setString(3, "KV");
+            stmt.executeUpdate();
+            conn.commit();
+        }
+        ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            assertNumRecords(3, "SELECT count(*) from " + tableName + " WHERE PK2 > ?", conn, new Date(10));
+            assertNumRecords(1, "SELECT count(*) from " + tableName + " WHERE PK2 < ? AND PK2 > ?", conn, new Date(30), new Date(10));
+            assertNumRecords(3, "SELECT count(*) from " + tableName + " WHERE PK2 <= ? AND PK2 >= ?", conn, new Date(30), new Date(10));
+            assertNumRecords(2, "SELECT count(*) from " + tableName + " WHERE PK2 <= ? AND PK2 > ?", conn,  new Date(30), new Date(10));
+            assertNumRecords(2, "SELECT count(*) from " + tableName + " WHERE PK2 < ? AND PK2 >= ?", conn, new Date(30), new Date(10));
+            assertNumRecords(0, "SELECT count(*) from " + tableName + " WHERE PK2 < ?", conn, new Date(10));
+            assertNumRecords(4, "SELECT count(*) from " + tableName, conn);
+        }
+    }
+    
+    private void assertNumRecords(int count, String sql, Connection conn, Date ... params) throws Exception {
+        PreparedStatement stmt = conn.prepareStatement(sql);
+        int counter = 1;
+        for (Date param : params) {
+            stmt.setDate(counter++, param);
+        }
+        ResultSet rs = stmt.executeQuery();
+        assertTrue(rs.next());
+        assertEquals(count, rs.getInt(1));
+    }
+    
+    @Test
+    public void testDisallowNegativeValuesForRowTsColumn() throws Exception {
+        String tableName = "testDisallowNegativeValuesForRowTsColumn".toUpperCase();
+        long ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            conn.createStatement().execute("CREATE TABLE " + tableName + " (PK1 DATE NOT NULL PRIMARY KEY ROW_TIMESTAMP, KV1 VARCHAR)");
+        }
+        ts = nextTimestamp();
+        try (Connection conn = getConnection(ts)) {
+            Date d = new Date(-100);
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName +  " VALUES (?, ?)");
+            stmt.setDate(1, d);
+            stmt.setString(2, "KV1");
+            stmt.executeUpdate();
+            fail();
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.ILLEGAL_DATA.getErrorCode(), e.getErrorCode());
+        }
+    }
+    
+    private static Connection getConnection(long ts) throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
+        return DriverManager.getConnection(getUrl(), props);
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/antlr3/PhoenixSQL.g
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g
index 19286dc..42ac9c4 100644
--- a/phoenix-core/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g
@@ -124,6 +124,7 @@ tokens
     REPLACE = 'replace';
     LIST = 'list';
     JARS='jars';
+    ROW_TIMESTAMP='row_timestamp';
 }
 
 
@@ -469,16 +470,17 @@ drop_sequence_node returns [DropSequenceStatement ret]
     ;
 
 pk_constraint returns [PrimaryKeyConstraint ret]
-    :   COMMA? CONSTRAINT n=identifier PRIMARY KEY LPAREN cols=col_name_with_sort_order_list RPAREN { $ret = factory.primaryKey(n,cols); }
+    :   COMMA? CONSTRAINT n=identifier PRIMARY KEY LPAREN cols=col_name_with_sort_order_rowtimestamp_list RPAREN { $ret = factory.primaryKey(n,cols); }
     ;
 
-col_name_with_sort_order_list returns [List<Pair<ColumnName, SortOrder>> ret]
-@init{ret = new ArrayList<Pair<ColumnName, SortOrder>>(); }
-    :   p=col_name_with_sort_order {$ret.add(p);}  (COMMA p = col_name_with_sort_order {$ret.add(p);} )*
+col_name_with_sort_order_rowtimestamp_list returns [List<ColumnDefInPkConstraint> ret]
+@init{ret = new ArrayList<ColumnDefInPkConstraint>(); }
+    :   p=col_name_with_sort_order_rowtimestamp {$ret.add(p);}  (COMMA p = col_name_with_sort_order_rowtimestamp {$ret.add(p);} )*
 ;
-
-col_name_with_sort_order returns [Pair<ColumnName, SortOrder> ret]
-    :   f=identifier (order=ASC|order=DESC)? {$ret = Pair.newPair(factory.columnName(f), order == null ? SortOrder.getDefault() : SortOrder.fromDDLValue(order.getText()));}
+ 
+col_name_with_sort_order_rowtimestamp returns [ColumnDefInPkConstraint ret]
+    :   f=identifier (order=ASC|order=DESC)? (rr=ROW_TIMESTAMP)?
+        { $ret = factory.columnDefInPkConstraint(factory.columnName(f), order == null ? SortOrder.getDefault() : SortOrder.fromDDLValue(order.getText()), rr != null); }
 ;
 
 ik_constraint returns [IndexKeyConstraint ret]
@@ -601,12 +603,13 @@ column_defs returns [List<ColumnDef> ret]
 ;
 
 column_def returns [ColumnDef ret]
-    :   c=column_name dt=identifier (LPAREN l=NUMBER (COMMA s=NUMBER)? RPAREN)? ar=ARRAY? (lsq=LSQUARE (a=NUMBER)? RSQUARE)? (nn=NOT? n=NULL)? (pk=PRIMARY KEY (order=ASC|order=DESC)?)?
+    :   c=column_name dt=identifier (LPAREN l=NUMBER (COMMA s=NUMBER)? RPAREN)? ar=ARRAY? (lsq=LSQUARE (a=NUMBER)? RSQUARE)? (nn=NOT? n=NULL)? (pk=PRIMARY KEY (order=ASC|order=DESC)? rr=ROW_TIMESTAMP?)?
         { $ret = factory.columnDef(c, dt, ar != null || lsq != null, a == null ? null :  Integer.parseInt( a.getText() ), nn!=null ? Boolean.FALSE : n!=null ? Boolean.TRUE : null, 
             l == null ? null : Integer.parseInt( l.getText() ),
             s == null ? null : Integer.parseInt( s.getText() ),
             pk != null, 
-            order == null ? SortOrder.getDefault() : SortOrder.fromDDLValue(order.getText()) ); }
+            order == null ? SortOrder.getDefault() : SortOrder.fromDDLValue(order.getText()),
+            rr != null); }
     ;
 
 dyn_column_defs returns [List<ColumnDef> ret]
@@ -620,7 +623,8 @@ dyn_column_def returns [ColumnDef ret]
             l == null ? null : Integer.parseInt( l.getText() ),
             s == null ? null : Integer.parseInt( s.getText() ),
             false, 
-            SortOrder.getDefault()); }
+            SortOrder.getDefault(),
+            false); }
     ;
 
 dyn_column_name_or_def returns [ColumnDef ret]
@@ -629,7 +633,8 @@ dyn_column_name_or_def returns [ColumnDef ret]
             l == null ? null : Integer.parseInt( l.getText() ),
             s == null ? null : Integer.parseInt( s.getText() ),
             false, 
-            SortOrder.getDefault()); }
+            SortOrder.getDefault(),
+            false); }
     ;
 
 subquery_expression returns [ParseNode ret]

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index ebbfd9c..96588d1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.compile;
 
+import static org.apache.phoenix.execute.MutationState.RowTimestampColInfo.NULL_ROWTIMESTAMP_INFO;
+
 import java.sql.ParameterMetaData;
 import java.sql.SQLException;
 import java.util.Arrays;
@@ -147,11 +149,13 @@ public class DeleteCompiler {
                     }
                     table.newKey(ptr, values);
                 }
-                mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter()));
+                // When issuing deletes, we do not care about the row time ranges. Also, if the table had a row timestamp column, then the
+                // row key will already have its value. 
+                mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO));
                 if (indexTableRef != null) {
                     ImmutableBytesPtr indexPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map
                     rs.getCurrentRow().getKey(indexPtr);
-                    indexMutations.put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter()));
+                    indexMutations.put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO));
                 }
                 if (mutations.size() > maxSize) {
                     throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize);
@@ -436,7 +440,7 @@ public class DeleteCompiler {
                         Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator(); 
                         Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount());
                         while (iterator.hasNext()) {
-                            mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter()));
+                            mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO));
                         }
                         return new MutationState(tableRef, mutation, 0, maxSize, connection);
                     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index 9845498..dd92b00 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -220,7 +220,7 @@ public class FromCompiler {
             Expression sourceExpression = projector.getColumnProjector(column.getPosition()).getExpression();
             PColumnImpl projectedColumn = new PColumnImpl(column.getName(), column.getFamilyName(),
                     sourceExpression.getDataType(), sourceExpression.getMaxLength(), sourceExpression.getScale(), sourceExpression.isNullable(),
-                    column.getPosition(), sourceExpression.getSortOrder(), column.getArraySize(), column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr());
+                    column.getPosition(), sourceExpression.getSortOrder(), column.getArraySize(), column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp());
             projectedColumns.add(projectedColumn);
         }
         PTable t = PTableImpl.makePTable(table, projectedColumns);
@@ -546,7 +546,7 @@ public class FromCompiler {
                         familyName = PNameFactory.newName(family);
                     }
                     allcolumns.add(new PColumnImpl(name, familyName, dynColumn.getDataType(), dynColumn.getMaxLength(),
-                            dynColumn.getScale(), dynColumn.isNull(), position, dynColumn.getSortOrder(), dynColumn.getArraySize(), null, false, dynColumn.getExpression()));
+                            dynColumn.getScale(), dynColumn.isNull(), position, dynColumn.getSortOrder(), dynColumn.getArraySize(), null, false, dynColumn.getExpression(), false));
                     position++;
                 }
                 theTable = PTableImpl.makePTable(theTable, allcolumns);
@@ -644,7 +644,7 @@ public class FromCompiler {
                 }
                 PColumnImpl column = new PColumnImpl(PNameFactory.newName(alias),
                         PNameFactory.newName(QueryConstants.DEFAULT_COLUMN_FAMILY),
-                        null, 0, 0, true, position++, SortOrder.ASC, null, null, false, null);
+                        null, 0, 0, true, position++, SortOrder.ASC, null, null, false, null, false);
                 columns.add(column);
             }
             PTable t = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
index 4683320..b09771a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
@@ -78,7 +78,7 @@ public class ListJarsQueryPlan implements QueryPlan {
         PColumn column =
                 new PColumnImpl(PNameFactory.newName("jar_location"), null,
                         PVarchar.INSTANCE, null, null, false, 0, SortOrder.getDefault(), 0, null,
-                        false, null);
+                        false, null, false);
         List<PColumn> columns = new ArrayList<PColumn>();
         columns.add(column);
         Expression expression =

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
index fcbeb7e..d75fe38 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
@@ -125,6 +125,7 @@ public class PostDDLCompiler {
                                 return Collections.singletonList(tableRef);
                             }
                             
+                            @Override
                             public java.util.List<PFunction> getFunctions() {
                                 return Collections.emptyList();
                             };
@@ -142,10 +143,12 @@ public class PostDDLCompiler {
                                 return new ColumnRef(tableRef, column.getPosition());
                             }
                             
+                            @Override
                             public PFunction resolveFunction(String functionName) throws SQLException {
                                 throw new UnsupportedOperationException();
                             };
 
+                            @Override
                             public boolean hasUDFs() {
                                 return false;
                             };

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
index 9f99f1c..f5bb4c4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
@@ -114,8 +114,9 @@ public class PostIndexDDLCompiler {
         this.selectQuery = selectQueryBuilder.toString();
         updateStmtStr.append(this.selectQuery);
         
-        final PhoenixStatement statement = new PhoenixStatement(connection);
-        return statement.compileMutation(updateStmtStr.toString());
+        try (final PhoenixStatement statement = new PhoenixStatement(connection)) {
+            return statement.compileMutation(updateStmtStr.toString());
+        }
     }
 
     public List<String> getIndexColumnNames() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
index 2a032a3..4d343f3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
@@ -19,15 +19,19 @@ package org.apache.phoenix.compile;
 
 import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.STARTKEY_OFFSET;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.query.KeyRange;
@@ -35,11 +39,14 @@ import org.apache.phoenix.query.KeyRange.Bound;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ScanUtil.BytesComparator;
 import org.apache.phoenix.util.SchemaUtil;
 
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
@@ -48,20 +55,20 @@ import com.google.common.collect.Lists;
 public class ScanRanges {
     private static final List<List<KeyRange>> EVERYTHING_RANGES = Collections.<List<KeyRange>>emptyList();
     private static final List<List<KeyRange>> NOTHING_RANGES = Collections.<List<KeyRange>>singletonList(Collections.<KeyRange>singletonList(KeyRange.EMPTY_RANGE));
-    public static final ScanRanges EVERYTHING = new ScanRanges(null,ScanUtil.SINGLE_COLUMN_SLOT_SPAN,EVERYTHING_RANGES, KeyRange.EVERYTHING_RANGE, KeyRange.EVERYTHING_RANGE, false, false, null);
-    public static final ScanRanges NOTHING = new ScanRanges(null,ScanUtil.SINGLE_COLUMN_SLOT_SPAN,NOTHING_RANGES, KeyRange.EMPTY_RANGE, KeyRange.EMPTY_RANGE, false, false, null);
+    public static final ScanRanges EVERYTHING = new ScanRanges(null,ScanUtil.SINGLE_COLUMN_SLOT_SPAN,EVERYTHING_RANGES, KeyRange.EVERYTHING_RANGE, KeyRange.EVERYTHING_RANGE, false, false, null, null);
+    public static final ScanRanges NOTHING = new ScanRanges(null,ScanUtil.SINGLE_COLUMN_SLOT_SPAN,NOTHING_RANGES, KeyRange.EMPTY_RANGE, KeyRange.EMPTY_RANGE, false, false, null, null);
     private static final Scan HAS_INTERSECTION = new Scan();
 
     public static ScanRanges createPointLookup(List<KeyRange> keys) {
-        return ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true);
+        return ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true, -1);
     }
     
     // For testing
     public static ScanRanges createSingleSpan(RowKeySchema schema, List<List<KeyRange>> ranges) {
-        return create(schema, ranges, ScanUtil.getDefaultSlotSpans(ranges.size()), KeyRange.EVERYTHING_RANGE, null, true);
+        return create(schema, ranges, ScanUtil.getDefaultSlotSpans(ranges.size()), KeyRange.EVERYTHING_RANGE, null, true, -1);
     }
     
-    public static ScanRanges create(RowKeySchema schema, List<List<KeyRange>> ranges, int[] slotSpan, KeyRange minMaxRange, Integer nBuckets, boolean useSkipScan) {
+    public static ScanRanges create(RowKeySchema schema, List<List<KeyRange>> ranges, int[] slotSpan, KeyRange minMaxRange, Integer nBuckets, boolean useSkipScan, int rowTimestampColIndex) {
         int offset = nBuckets == null ? 0 : SaltingUtil.NUM_SALTING_BYTES;
         int nSlots = ranges.size();
         if (nSlots == offset && minMaxRange == KeyRange.EVERYTHING_RANGE) {
@@ -69,6 +76,7 @@ public class ScanRanges {
         } else if (minMaxRange == KeyRange.EMPTY_RANGE || (nSlots == 1 + offset && ranges.get(offset).size() == 1 && ranges.get(offset).get(0) == KeyRange.EMPTY_RANGE)) {
             return NOTHING;
         }
+        TimeRange rowTimestampRange = getRowTimestampColumnRange(ranges, schema, rowTimestampColIndex);
         boolean isPointLookup = isPointLookup(schema, ranges, slotSpan, useSkipScan);
         if (isPointLookup) {
             // TODO: consider keeping original to use for serialization as it would be smaller?
@@ -111,6 +119,7 @@ public class ScanRanges {
             sortedRanges.add(ImmutableList.copyOf(sorted));
         }
         
+        
         // Don't set minMaxRange for point lookup because it causes issues during intersect
         // by going across region boundaries
         KeyRange scanRange = KeyRange.EVERYTHING_RANGE;
@@ -139,7 +148,7 @@ public class ScanRanges {
         if (scanRange == KeyRange.EMPTY_RANGE) {
             return NOTHING;
         }
-        return new ScanRanges(schema, slotSpan, sortedRanges, scanRange, minMaxRange, useSkipScan, isPointLookup, nBuckets);
+        return new ScanRanges(schema, slotSpan, sortedRanges, scanRange, minMaxRange, useSkipScan, isPointLookup, nBuckets, rowTimestampRange);
     }
 
     private SkipScanFilter filter;
@@ -151,13 +160,15 @@ public class ScanRanges {
     private final boolean useSkipScanFilter;
     private final KeyRange scanRange;
     private final KeyRange minMaxRange;
+    private final TimeRange rowTimestampRange;
 
-    private ScanRanges (RowKeySchema schema, int[] slotSpan, List<List<KeyRange>> ranges, KeyRange scanRange, KeyRange minMaxRange, boolean useSkipScanFilter, boolean isPointLookup, Integer bucketNum) {
+    private ScanRanges (RowKeySchema schema, int[] slotSpan, List<List<KeyRange>> ranges, KeyRange scanRange, KeyRange minMaxRange, boolean useSkipScanFilter, boolean isPointLookup, Integer bucketNum, TimeRange rowTimestampRange) {
         this.isPointLookup = isPointLookup;
         this.isSalted = bucketNum != null;
         this.useSkipScanFilter = useSkipScanFilter;
         this.scanRange = scanRange;
         this.minMaxRange = minMaxRange;
+        this.rowTimestampRange = rowTimestampRange;
         
         // Only blow out the bucket values if we're using the skip scan. We need all the
         // bucket values in this case because we use intersect against a key that may have
@@ -602,4 +613,89 @@ public class ScanRanges {
         return false;
         
     }
+    
+    private static TimeRange getRowTimestampColumnRange(List<List<KeyRange>> ranges, RowKeySchema schema, int rowTimestampColPos) {
+        try {
+            if (rowTimestampColPos != -1) {
+                if (ranges != null && ranges.size() > rowTimestampColPos) {
+                    List<KeyRange> rowTimestampColRange = ranges.get(rowTimestampColPos);
+                    List<KeyRange> sortedRange = new ArrayList<>(rowTimestampColRange);
+                    Collections.sort(sortedRange, KeyRange.COMPARATOR);
+                    //ranges.set(rowTimestampColPos, sortedRange); //TODO: do I really need to do this?
+                    Field f = schema.getField(rowTimestampColPos);
+                    SortOrder order = f.getSortOrder();
+                    KeyRange lowestRange = rowTimestampColRange.get(0);
+                    KeyRange highestRange = rowTimestampColRange.get(rowTimestampColRange.size() - 1);
+                    if (order == SortOrder.DESC) {
+                        return getDescTimeRange(lowestRange, highestRange, f);
+                    }
+                    return getAscTimeRange( lowestRange, highestRange, f);
+                }
+            }
+        } catch (IOException e) {
+            Throwables.propagate(e);
+        }
+        return null;
+    }
+
+    private static TimeRange getAscTimeRange(KeyRange lowestRange, KeyRange highestRange, Field f)
+            throws IOException {
+        long low;
+        long high;
+        if (lowestRange.lowerUnbound()) {
+            low = 0;
+        } else {
+            long lowerRange = f.getDataType().getCodec().decodeLong(lowestRange.getLowerRange(), 0, SortOrder.ASC);
+            low = lowestRange.isLowerInclusive() ? lowerRange : safelyIncrement(lowerRange);
+        }
+        if (highestRange.upperUnbound()) {
+            high = HConstants.LATEST_TIMESTAMP;
+        } else {
+            long upperRange = f.getDataType().getCodec().decodeLong(highestRange.getUpperRange(), 0, SortOrder.ASC);
+            if (highestRange.isUpperInclusive()) {
+                high = safelyIncrement(upperRange);
+            } else {
+                high = upperRange;
+            }
+        }
+        return new TimeRange(low, high);
+    }
+    
+    public static TimeRange getDescTimeRange(KeyRange lowestKeyRange, KeyRange highestKeyRange, Field f) throws IOException {
+        boolean lowerUnbound = lowestKeyRange.lowerUnbound();
+        boolean lowerInclusive = lowestKeyRange.isLowerInclusive();
+        boolean upperUnbound = highestKeyRange.upperUnbound();
+        boolean upperInclusive = highestKeyRange.isUpperInclusive();
+
+        long low = lowerUnbound ? -1 : f.getDataType().getCodec().decodeLong(lowestKeyRange.getLowerRange(), 0, SortOrder.DESC);
+        long high = upperUnbound ? -1 : f.getDataType().getCodec().decodeLong(highestKeyRange.getUpperRange(), 0, SortOrder.DESC);
+        long newHigh;
+        long newLow;
+        if (!lowerUnbound && !upperUnbound) {
+            newHigh = lowerInclusive ? safelyIncrement(low) : low;
+            newLow = upperInclusive ? high : safelyIncrement(high);
+            return new TimeRange(newLow, newHigh);
+        } else if (!lowerUnbound && upperUnbound) {
+            newHigh = lowerInclusive ? safelyIncrement(low) : low;
+            newLow = 0;
+            return new TimeRange(newLow, newHigh);
+        } else if (lowerUnbound && !upperUnbound) {
+            newLow = upperInclusive ? high : safelyIncrement(high);
+            newHigh = HConstants.LATEST_TIMESTAMP;
+            return new TimeRange(newLow, newHigh);
+        } else {
+            newLow = 0;
+            newHigh = HConstants.LATEST_TIMESTAMP;
+            return new TimeRange(newLow, newHigh);
+        }
+    }
+    
+    private static long safelyIncrement(long value) {
+        return value < HConstants.LATEST_TIMESTAMP ? (value + 1) : HConstants.LATEST_TIMESTAMP;
+    }
+    
+    public TimeRange getRowTimestampRange() {
+        return rowTimestampRange;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index 52bb7f2..312de45 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -28,7 +28,6 @@ import java.util.TimeZone;
 
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
@@ -80,8 +79,6 @@ public class StatementContext {
 
     private TableRef currentTable;
     private List<Pair<byte[], byte[]>> whereConditionColumns;
-    private TimeRange scanTimeRange = null;
-
     private Map<SelectStatement, Object> subqueryResults;
     private final ReadMetricQueue readMetricsQueue;
     private final OverAllQueryMetrics overAllQueryMetrics;
@@ -286,14 +283,6 @@ public class StatementContext {
         return whereConditionColumns;
     }
 
-    public void setScanTimeRange(TimeRange value){
-    	this.scanTimeRange = value;
-    }
-
-    public TimeRange getScanTimeRange() {
-    	return this.scanTimeRange;
-    }
-
     public boolean isSubqueryResultAvailable(SelectStatement select) {
         return subqueryResults.containsKey(select);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index d8616b9..ba95d9b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@ -77,7 +77,7 @@ public class TraceQueryPlan implements QueryPlan {
         PColumn column =
                 new PColumnImpl(PNameFactory.newName(MetricInfo.TRACE.columnName), null,
                         PLong.INSTANCE, null, null, false, 0, SortOrder.getDefault(), 0, null,
-                        false, null);
+                        false, null, false);
         List<PColumn> columns = new ArrayList<PColumn>();
         columns.add(column);
         Expression expression =

http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
index 942e244..afca97a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
@@ -75,7 +75,7 @@ public class UnionCompiler {
             String name = selectNodes == null ? colProj.getName() : selectNodes.get(i).getAlias();
             PColumnImpl projectedColumn = new PColumnImpl(PNameFactory.newName(name), UNION_FAMILY_NAME,
                     sourceExpression.getDataType(), sourceExpression.getMaxLength(), sourceExpression.getScale(), sourceExpression.isNullable(),
-                    i, sourceExpression.getSortOrder(), 500, null, false, sourceExpression.toString());
+                    i, sourceExpression.getSortOrder(), 500, null, false, sourceExpression.toString(), false);
             projectedColumns.add(projectedColumn);
         }
         Long scn = statement.getConnection().getSCN();