You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2015/10/03 23:38:23 UTC
[3/3] phoenix git commit: PHOENIX-914 Native HBase timestamp support
to optimize date range queries in Phoenix
PHOENIX-914 Native HBase timestamp support to optimize date range queries in Phoenix
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/72e7ccd1
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/72e7ccd1
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/72e7ccd1
Branch: refs/heads/4.x-HBase-0.98
Commit: 72e7ccd17f7cbbaae4adaa85a23fd54a46315770
Parents: 911a968
Author: Samarth <sa...@salesforce.com>
Authored: Sat Oct 3 14:37:54 2015 -0700
Committer: Samarth <sa...@salesforce.com>
Committed: Sat Oct 3 14:37:54 2015 -0700
----------------------------------------------------------------------
.../apache/phoenix/end2end/AlterTableIT.java | 70 +++
.../org/apache/phoenix/end2end/DeleteIT.java | 63 +++
.../apache/phoenix/end2end/UpsertSelectIT.java | 531 +++++++++++++++++++
.../apache/phoenix/end2end/UpsertValuesIT.java | 284 ++++++++++
phoenix-core/src/main/antlr3/PhoenixSQL.g | 27 +-
.../apache/phoenix/compile/DeleteCompiler.java | 10 +-
.../apache/phoenix/compile/FromCompiler.java | 6 +-
.../phoenix/compile/ListJarsQueryPlan.java | 2 +-
.../apache/phoenix/compile/PostDDLCompiler.java | 3 +
.../phoenix/compile/PostIndexDDLCompiler.java | 5 +-
.../org/apache/phoenix/compile/ScanRanges.java | 110 +++-
.../phoenix/compile/StatementContext.java | 11 -
.../apache/phoenix/compile/TraceQueryPlan.java | 2 +-
.../apache/phoenix/compile/UnionCompiler.java | 2 +-
.../apache/phoenix/compile/UpsertCompiler.java | 120 ++++-
.../apache/phoenix/compile/WhereOptimizer.java | 2 +-
.../coprocessor/MetaDataEndpointImpl.java | 14 +-
.../phoenix/coprocessor/MetaDataProtocol.java | 1 +
.../coprocessor/generated/PTableProtos.java | 142 ++++-
.../phoenix/exception/SQLExceptionCode.java | 11 +-
.../apache/phoenix/execute/BaseQueryPlan.java | 29 +-
.../apache/phoenix/execute/MutationState.java | 110 +++-
.../apache/phoenix/iterate/ExplainTable.java | 7 +
.../apache/phoenix/jdbc/PhoenixConnection.java | 11 +-
.../phoenix/jdbc/PhoenixDatabaseMetaData.java | 2 +
.../phoenix/jdbc/PhoenixPreparedStatement.java | 1 -
.../org/apache/phoenix/parse/ColumnDef.java | 11 +-
.../phoenix/parse/ColumnDefInPkConstraint.java | 44 ++
.../apache/phoenix/parse/ParseNodeFactory.java | 21 +-
.../phoenix/parse/PrimaryKeyConstraint.java | 48 +-
.../query/ConnectionQueryServicesImpl.java | 7 +-
.../apache/phoenix/query/QueryConstants.java | 3 +
.../apache/phoenix/schema/DelegateColumn.java | 5 +
.../apache/phoenix/schema/DelegateTable.java | 5 +
.../apache/phoenix/schema/MetaDataClient.java | 245 +++++----
.../java/org/apache/phoenix/schema/PColumn.java | 5 +
.../org/apache/phoenix/schema/PColumnImpl.java | 19 +-
.../java/org/apache/phoenix/schema/PDatum.java | 1 +
.../apache/phoenix/schema/PMetaDataImpl.java | 2 +-
.../java/org/apache/phoenix/schema/PTable.java | 6 +
.../org/apache/phoenix/schema/PTableImpl.java | 16 +
.../org/apache/phoenix/schema/SaltingUtil.java | 2 +-
.../java/org/apache/phoenix/util/ScanUtil.java | 34 ++
.../phoenix/compile/QueryCompilerTest.java | 43 ++
.../phoenix/compile/WhereOptimizerTest.java | 2 +
.../phoenix/execute/CorrelatePlanTest.java | 2 +-
.../phoenix/execute/UnnestArrayPlanTest.java | 4 +-
.../expression/ColumnExpressionTest.java | 8 +-
.../iterate/AggregateResultScannerTest.java | 4 +
.../apache/phoenix/jdbc/PhoenixDriverTest.java | 16 +
.../apache/phoenix/parse/QueryParserTest.java | 2 +-
phoenix-protocol/src/main/PTable.proto | 1 +
52 files changed, 1877 insertions(+), 255 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
index 4053301..dc47d99 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java
@@ -23,6 +23,7 @@ import static org.apache.phoenix.util.TestUtil.closeConnection;
import static org.apache.phoenix.util.TestUtil.closeStatement;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -1998,5 +1999,74 @@ public class AlterTableIT extends BaseOwnClusterHBaseManagedTimeIT {
conn.close();
}
}
+
+ @Test
+ public void testDeclaringColumnAsRowTimestamp() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement().execute("CREATE TABLE T1 (PK1 DATE NOT NULL, PK2 VARCHAR NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1 ROW_TIMESTAMP, PK2)) ");
+ PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class);
+ PTable table = phxConn.getMetaDataCache().getTable(new PTableKey(phxConn.getTenantId(), "T1"));
+ // Assert that the column shows up as row time stamp in the cache.
+ assertTrue(table.getColumn("PK1").isRowTimestamp());
+ assertFalse(table.getColumn("PK2").isRowTimestamp());
+ assertIsRowTimestampSet("T1", "PK1");
+
+ conn.createStatement().execute("CREATE TABLE T6 (PK1 VARCHAR, PK2 DATE PRIMARY KEY ROW_TIMESTAMP, KV1 VARCHAR, KV2 INTEGER)");
+ table = phxConn.getMetaDataCache().getTable(new PTableKey(phxConn.getTenantId(), "T6"));
+ // Assert that the column shows up as row time stamp in the cache.
+ assertFalse(table.getColumn("PK1").isRowTimestamp());
+ assertTrue(table.getColumn("PK2").isRowTimestamp());
+ assertIsRowTimestampSet("T6", "PK2");
+
+ // Create an index on a table has a row time stamp pk column. The column should show up as a row time stamp column for the index too.
+ conn.createStatement().execute("CREATE INDEX T6_IDX ON T6 (KV1) include (KV2)");
+ PTable indexTable = phxConn.getMetaDataCache().getTable(new PTableKey(phxConn.getTenantId(), "T6_IDX"));
+ String indexColName = IndexUtil.getIndexColumnName(table.getColumn("PK2"));
+ // Assert that the column shows up as row time stamp in the cache.
+ assertTrue(indexTable.getColumn(indexColName).isRowTimestamp());
+ assertIsRowTimestampSet("T6_IDX", indexColName);
+
+ // Creating a view with a row_timestamp column in its pk constraint is not allowed
+ try {
+ conn.createStatement().execute("CREATE VIEW T6_VIEW (KV3 VARCHAR, KV4 DATE, KV5 INTEGER, CONSTRAINT PK PRIMARY KEY (KV3, KV4 ROW_TIMESTAMP) ) AS SELECT * FROM T6");
+ fail("Creating a view with a row_timestamp column in its pk constraint is not allowed");
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.ROWTIMESTAMP_NOT_ALLOWED_ON_VIEW.getErrorCode(), e.getErrorCode());
+ }
+
+ // Make sure that the base table column declared as row_timestamp is also row_timestamp for view
+ conn.createStatement().execute("CREATE VIEW T6_VIEW (KV3 VARCHAR, KV4 VARCHAR, KV5 INTEGER, CONSTRAINT PK PRIMARY KEY (KV3, KV4) ) AS SELECT * FROM T6");
+ PTable view = phxConn.getMetaDataCache().getTable(new PTableKey(phxConn.getTenantId(), "T6_VIEW"));
+ assertNotNull(view.getPKColumn("PK2"));
+ assertTrue(view.getPKColumn("PK2").isRowTimestamp());
+ }
+ }
+
+ private void assertIsRowTimestampSet(String tableName, String columnName) throws SQLException {
+ String sql = "SELECT IS_ROW_TIMESTAMP FROM SYSTEM.CATALOG WHERE TABLE_SCHEM IS NULL AND TABLE_NAME = ? AND COLUMN_FAMILY IS NULL AND COLUMN_NAME = ?";
+ try(Connection conn = DriverManager.getConnection(getUrl())) {
+ PreparedStatement stmt = conn.prepareStatement(sql);
+ stmt.setString(1, tableName);
+ stmt.setString(2, columnName);
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(true, rs.getBoolean(1));
+ }
+ }
+
+ @Test
+ public void testAddingRowTimestampColumnNotAllowedViaAlterTable() throws Exception {
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.createStatement().execute("CREATE TABLE T1 (PK1 VARCHAR NOT NULL, PK2 VARCHAR NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2)) ");
+ // adding a new pk column that is also row_timestamp is not allowed
+ try {
+ conn.createStatement().execute("ALTER TABLE T1 ADD PK3 DATE PRIMARY KEY ROW_TIMESTAMP");
+ fail("Altering table to add a PK column as row_timestamp column should fail");
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.ROWTIMESTAMP_CREATE_ONLY.getErrorCode(), e.getErrorCode());
+ }
+ }
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
index 36dff6f..745c730 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java
@@ -472,6 +472,69 @@ public class DeleteIT extends BaseHBaseManagedTimeIT {
}
}
}
+
+ @Test
+ public void testDeleteForTableWithRowTimestampColServer() throws Exception {
+ testDeleteForTableWithRowTimestampCol(true);
+ }
+
+ @Test
+ public void testDeleteForTableWithRowTimestampColClient() throws Exception {
+ testDeleteForTableWithRowTimestampCol(false);
+ }
+
+ private void testDeleteForTableWithRowTimestampCol(boolean autoCommit) throws Exception {
+ String tableName = "testDeleteForTableWithRowTimestampCol".toUpperCase();
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ conn.setAutoCommit(autoCommit);
+ Statement stm = conn.createStatement();
+ stm.execute("CREATE TABLE IF NOT EXISTS " + tableName +
+ " (HOST CHAR(2) NOT NULL," +
+ "STAT_DATE DATE NOT NULL, \n" +
+ "USAGE.CORE BIGINT," +
+ "USAGE.DB BIGINT," +
+ "CONSTRAINT PK PRIMARY KEY (HOST, STAT_DATE ROW_TIMESTAMP))");
+ stm.close();
+
+ PreparedStatement psInsert = conn
+ .prepareStatement("UPSERT INTO " + tableName + " (HOST, STAT_DATE, CORE, DB) VALUES(?,?,?,?)");
+ psInsert.setString(1, "AA");
+ psInsert.setDate(2, new Date(100));
+ psInsert.setLong(3, 1L);
+ psInsert.setLong(4, 2L);
+ psInsert.execute();
+ psInsert.close();
+ if (!autoCommit) {
+ conn.commit();
+ }
+ conn.createStatement().execute("DELETE FROM " + tableName);
+ if (!autoCommit) {
+ conn.commit();
+ }
+ ResultSet rs = conn.createStatement().executeQuery("SELECT count(*) FROM " + tableName);
+ assertTrue(rs.next());
+ assertEquals(0, rs.getLong(1));
+
+ // try with value for row_timestamp column generated by phoenix
+ psInsert = conn
+ .prepareStatement("UPSERT INTO " + tableName + " (HOST, CORE, DB) VALUES(?,?,?)");
+ psInsert.setString(1, "BB");
+ psInsert.setLong(2, 1L);
+ psInsert.setLong(3, 2L);
+ psInsert.execute();
+ psInsert.close();
+ if (!autoCommit) {
+ conn.commit();
+ }
+ conn.createStatement().execute("DELETE FROM " + tableName);
+ if (!autoCommit) {
+ conn.commit();
+ }
+ rs = conn.createStatement().executeQuery("SELECT count(*) FROM " + tableName);
+ assertTrue(rs.next());
+ assertEquals(0, rs.getLong(1));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
index d319b4d..aca1602 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.end2end;
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
import static org.apache.phoenix.util.PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB;
import static org.apache.phoenix.util.TestUtil.A_VALUE;
import static org.apache.phoenix.util.TestUtil.B_VALUE;
@@ -33,6 +34,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.math.BigDecimal;
import java.sql.Connection;
@@ -40,9 +42,13 @@ import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
+import java.sql.SQLException;
import java.util.Map;
import java.util.Properties;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.types.PInteger;
@@ -800,4 +806,529 @@ public class UpsertSelectIT extends BaseClientManagedTimeIT {
conn.close();
}
+
+ @Test
+ public void testUpsertSelectWithRowtimeStampColumn() throws Exception {
+ long ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ conn.createStatement().execute("CREATE TABLE T1 (PK1 VARCHAR NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 DESC ROW_TIMESTAMP " + ")) ");
+ conn.createStatement().execute("CREATE TABLE T2 (PK1 VARCHAR NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 ROW_TIMESTAMP)) ");
+ conn.createStatement().execute("CREATE TABLE T3 (PK1 VARCHAR NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 DESC ROW_TIMESTAMP " + ")) ");
+ }
+
+ // Upsert data with scn set on the connection. However, the timestamp of the put will be the value of the row_timestamp column.
+ ts = nextTimestamp();
+ long rowTimestamp = 1000000;
+ Date rowTimestampDate = new Date(rowTimestamp);
+ try (Connection conn = getConnection(ts)) {
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO T1 (PK1, PK2, KV1) VALUES(?, ?, ?)");
+ stmt.setString(1, "PK1");
+ stmt.setDate(2, rowTimestampDate);
+ stmt.setString(3, "KV1");
+ stmt.executeUpdate();
+ conn.commit();
+ }
+
+ // Upsert select data into table T2. The connection needs to be at a timestamp beyond the row timestamp. Otherwise
+ // it won't see the data from table T1.
+ try (Connection conn = getConnection(rowTimestamp + 5)) {
+ conn.createStatement().executeUpdate("UPSERT INTO T2 SELECT * FROM T1");
+ conn.commit();
+ // Verify the data upserted in T2. Note that we can use the same connection here because the data was
+ // inserted with a timestamp of rowTimestamp and the connection is at rowTimestamp + 5.
+ PreparedStatement stmt = conn.prepareStatement("SELECT * FROM T2 WHERE PK1 = ? AND PK2 = ?");
+ stmt.setString(1, "PK1");
+ stmt.setDate(2, rowTimestampDate);
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals("PK1", rs.getString("PK1"));
+ assertEquals(rowTimestampDate, rs.getDate("PK2"));
+ assertEquals("KV1", rs.getString("KV1"));
+ }
+
+ // Verify that you can't see the data in T2 if the connection is at next timestamp (which is lower than the row timestamp).
+ try (Connection conn = getConnection(nextTimestamp())) {
+ PreparedStatement stmt = conn.prepareStatement("SELECT * FROM T2 WHERE PK1 = ? AND PK2 = ?");
+ stmt.setString(1, "PK1");
+ stmt.setDate(2, rowTimestampDate);
+ ResultSet rs = stmt.executeQuery();
+ assertFalse(rs.next());
+ }
+
+ // Upsert select data into table T3. The connection needs to be at a timestamp beyond the row timestamp. Otherwise
+ // it won't see the data from table T1.
+ try (Connection conn = getConnection(rowTimestamp + 5)) {
+ conn.createStatement().executeUpdate("UPSERT INTO T3 SELECT * FROM T2");
+ conn.commit();
+ // Verify the data upserted in T3. Note that we can use the same connection here because the data was
+ // inserted with a timestamp of rowTimestamp and the connection is at rowTimestamp + 5.
+ PreparedStatement stmt = conn.prepareStatement("SELECT * FROM T3 WHERE PK1 = ? AND PK2 = ?");
+ stmt.setString(1, "PK1");
+ stmt.setDate(2, rowTimestampDate);
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals("PK1", rs.getString("PK1"));
+ assertEquals(rowTimestampDate, rs.getDate("PK2"));
+ assertEquals("KV1", rs.getString("KV1"));
+ }
+
+ // Verify that you can't see the data in T2 if the connection is at next timestamp (which is lower than the row timestamp).
+ try (Connection conn = getConnection(nextTimestamp())) {
+ PreparedStatement stmt = conn.prepareStatement("SELECT * FROM T3 WHERE PK1 = ? AND PK2 = ?");
+ stmt.setString(1, "PK1");
+ stmt.setDate(2, rowTimestampDate);
+ ResultSet rs = stmt.executeQuery();
+ assertFalse(rs.next());
+ }
+ }
+
+ @Test
+ public void testUpsertSelectSameTableWithRowTimestampColumn() throws Exception {
+ String tableName = "testUpsertSelectSameTableWithRowTimestampColumn".toUpperCase();
+ long ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ conn.createStatement().execute("CREATE TABLE " + tableName + " (PK1 INTEGER NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 ROW_TIMESTAMP)) ");
+ }
+
+ // Upsert data with scn set on the connection. The timestamp of the put will be the value of the row_timestamp column.
+ ts = nextTimestamp();
+ long rowTimestamp = ts + 100000;
+ Date rowTimestampDate = new Date(rowTimestamp);
+ try (Connection conn = getConnection(ts)) {
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (PK1, PK2, KV1) VALUES(?, ?, ?)");
+ stmt.setInt(1, 1);
+ stmt.setDate(2, rowTimestampDate);
+ stmt.setString(3, "KV1");
+ stmt.executeUpdate();
+ conn.commit();
+ }
+
+ try (Connection conn = getConnection(nextTimestamp())) {
+ conn.createStatement().execute("CREATE SEQUENCE T_SEQ");
+ }
+ // Upsert select data into table. The connection needs to be at a timestamp beyond the row timestamp. Otherwise
+ // it won't see the data from table.
+ try (Connection conn = getConnection(rowTimestamp + 5)) {
+ conn.createStatement().executeUpdate("UPSERT INTO " + tableName + " SELECT NEXT VALUE FOR T_SEQ, PK2 FROM " + tableName);
+ conn.commit();
+ }
+
+ // Upsert select using sequences.
+ try (Connection conn = getConnection(rowTimestamp + 5)) {
+ conn.setAutoCommit(true);
+ for (int i = 0; i < 10; i++) {
+ int count = conn.createStatement().executeUpdate("UPSERT INTO " + tableName + " SELECT NEXT VALUE FOR T_SEQ, PK2 FROM " + tableName);
+ assertEquals((int)Math.pow(2, i), count);
+ }
+ }
+ }
+
+ @Test
+ public void testAutomaticallySettingRowtimestamp() throws Exception {
+ String table1 = "testAutomaticallySettingRowtimestamp1".toUpperCase();
+ String table2 = "testAutomaticallySettingRowtimestamp2".toUpperCase();
+ String table3 = "testAutomaticallySettingRowtimestamp3".toUpperCase();
+ long ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ conn.createStatement().execute("CREATE TABLE " + table1 + " (T1PK1 VARCHAR NOT NULL, T1PK2 DATE NOT NULL, T1KV1 VARCHAR, T1KV2 VARCHAR CONSTRAINT PK PRIMARY KEY(T1PK1, T1PK2 DESC ROW_TIMESTAMP)) ");
+ conn.createStatement().execute("CREATE TABLE " + table2 + " (T2PK1 VARCHAR NOT NULL, T2PK2 DATE NOT NULL, T2KV1 VARCHAR, T2KV2 VARCHAR CONSTRAINT PK PRIMARY KEY(T2PK1, T2PK2 ROW_TIMESTAMP)) ");
+ conn.createStatement().execute("CREATE TABLE " + table3 + " (T3PK1 VARCHAR NOT NULL, T3PK2 DATE NOT NULL, T3KV1 VARCHAR, T3KV2 VARCHAR CONSTRAINT PK PRIMARY KEY(T3PK1, T3PK2 DESC ROW_TIMESTAMP)) ");
+ }
+ ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ // Upsert values where row_timestamp column PK2 is not set and the column names are specified
+ // This should upsert data with the value for PK2 as new Date(ts);
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + table1 + " (T1PK1, T1KV1, T1KV2) VALUES (?, ?, ?)");
+ stmt.setString(1, "PK1");
+ stmt.setString(2, "KV1");
+ stmt.setString(3, "KV2");
+ stmt.executeUpdate();
+ conn.commit();
+ }
+ Date upsertedDate = new Date(ts);
+ ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ // Now query for data that was upserted above. If the row key was generated correctly then we should be able to see
+ // the data in this query.
+ PreparedStatement stmt = conn.prepareStatement("SELECT T1KV1, T1KV2 FROM " + table1 + " WHERE T1PK1 = ? AND T1PK2 = ?");
+ stmt.setString(1, "PK1");
+ stmt.setDate(2, upsertedDate);
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals("KV1", rs.getString(1));
+ assertEquals("KV2", rs.getString(2));
+ assertFalse(rs.next());
+ }
+
+ ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ // Upsert select into table2 by not selecting the row timestamp column. In this case, the rowtimestamp column would end up being set to the scn of the connection
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + table2 + " (T2PK1, T2KV1, T2KV2) SELECT T1PK1, T1KV1, T1KV2 FROM " + table1);
+ stmt.executeUpdate();
+ conn.commit();
+ }
+
+ upsertedDate = new Date(ts);
+ ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ // Now query for data that was upserted above. If the row key was generated correctly then we should be able to see
+ // the data in this query.
+ PreparedStatement stmt = conn.prepareStatement("SELECT T2KV1, T2KV2 FROM " + table2 + " WHERE T2PK1 = ? AND T2PK2 = ?");
+ stmt.setString(1, "PK1");
+ stmt.setDate(2, upsertedDate);
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals("KV1", rs.getString(1));
+ assertEquals("KV2", rs.getString(2));
+ assertFalse(rs.next());
+ }
+
+ ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ // Upsert select into table2 by not selecting the row timestamp column. In this case, the rowtimestamp column would end up being set to the scn of the connection
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + table3 + " (T3PK1, T3KV1, T3KV2) SELECT T2PK1, T2KV1, T2KV2 FROM " + table2);
+ stmt.executeUpdate();
+ conn.commit();
+ }
+
+ upsertedDate = new Date(ts);
+ ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ // Now query for data that was upserted above. If the row key was generated correctly then we should be able to see
+ // the data in this query.
+ PreparedStatement stmt = conn.prepareStatement("SELECT T3KV1, T3KV2 FROM " + table3 + " WHERE T3PK1 = ? AND T3PK2 = ?");
+ stmt.setString(1, "PK1");
+ stmt.setDate(2, upsertedDate);
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals("KV1", rs.getString(1));
+ assertEquals("KV2", rs.getString(2));
+ assertFalse(rs.next());
+ }
+ }
+
+ @Test
+ public void testUpsertSelectAutoCommitWithRowTimestampColumn() throws Exception {
+ String tableName1 = "testUpsertSelectServerSideWithRowTimestampColumn".toUpperCase();
+ String tableName2 = "testUpsertSelectServerSideWithRowTimestampColumn2".toUpperCase();
+ long ts = 10;
+ try (Connection conn = getConnection(ts)) {
+ conn.createStatement().execute("CREATE TABLE " + tableName1 + " (PK1 INTEGER NOT NULL, PK2 DATE NOT NULL, PK3 INTEGER NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 ROW_TIMESTAMP, PK3)) ");
+ conn.createStatement().execute("CREATE TABLE " + tableName2 + " (PK1 INTEGER NOT NULL, PK2 DATE NOT NULL, PK3 INTEGER NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 DESC ROW_TIMESTAMP, PK3)) ");
+ }
+
+ String[] tableNames = {tableName1, tableName2};
+ for (String tableName : tableNames) {
+ // Upsert data with scn set on the connection. The timestamp of the put will be the value of the row_timestamp column.
+ long rowTimestamp1 = 100;
+ Date rowTimestampDate = new Date(rowTimestamp1);
+ try (Connection conn = getConnection(ts)) {
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (PK1, PK2, PK3, KV1) VALUES(?, ?, ?, ?)");
+ stmt.setInt(1, 1);
+ stmt.setDate(2, rowTimestampDate);
+ stmt.setInt(3, 3);
+ stmt.setString(4, "KV1");
+ stmt.executeUpdate();
+ conn.commit();
+ }
+
+ long rowTimestamp2 = 2 * rowTimestamp1;
+ try (Connection conn = getConnection(rowTimestamp2)) {
+ conn.setAutoCommit(true);
+ // Upsert select in the same table with the row_timestamp column PK2 not specified. This will end up
+ // creating a new row whose timestamp is the SCN of the connection. The same SCN will be used
+ // for the row key too.
+ conn.createStatement().executeUpdate("UPSERT INTO " + tableName + " (PK1, PK3, KV1) SELECT PK1, PK3, KV1 FROM " + tableName);
+ }
+ try (Connection conn = getConnection(3 * rowTimestamp1)) {
+ // Verify the row that was upserted above
+ PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + tableName + " WHERE PK1 = ? AND PK2 = ? AND PK3 = ?");
+ stmt.setInt(1, 1);
+ stmt.setDate(2, new Date(rowTimestamp2));
+ stmt.setInt(3, 3);
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt("PK1"));
+ assertEquals(3, rs.getInt("PK3"));
+ assertEquals("KV1", rs.getString("KV1"));
+ assertEquals(new Date(rowTimestamp2), rs.getDate("PK2"));
+ assertFalse(rs.next());
+ // Number of rows in the table should be 2.
+ rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName);
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+
+ }
+ ts = 5 * rowTimestamp1;
+ try (Connection conn = getConnection(ts)) {
+ conn.setAutoCommit(true);
+ // Upsert select in the same table with the row_timestamp column PK2 specified. This will not end up creating a new row
+ // because the destination pk columns, including the row timestamp column PK2, are the same as the source column.
+ conn.createStatement().executeUpdate("UPSERT INTO " + tableName + " (PK1, PK2, PK3, KV1) SELECT PK1, PK2, PK3, KV1 FROM " + tableName);
+ }
+ ts = 6 * rowTimestamp1;
+ try (Connection conn = getConnection(ts)) {
+ // Verify that two rows were created. One with rowtimestamp1 and the other with rowtimestamp2
+ ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName);
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ assertFalse(rs.next());
+ }
+
+ }
+ }
+
+ @Test
+ public void testRowTimestampColWithViewsIndexesAndSaltedTables() throws Exception {
+ String baseTable = "testRowTimestampColWithViewsIndexesAndSaltedTables".toUpperCase();
+ String tenantView = "tenatView".toUpperCase();
+ String globalView = "globalView".toUpperCase();
+ String baseTableIdx = "table_idx".toUpperCase();
+ String tenantViewIdx = "tenantView_idx".toUpperCase();
+
+ long ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ conn.createStatement().execute("CREATE TABLE " + baseTable + " (TENANT_ID CHAR(15) NOT NULL, PK2 DATE NOT NULL, PK3 INTEGER NOT NULL, KV1 VARCHAR, KV2 VARCHAR, KV3 VARCHAR CONSTRAINT PK PRIMARY KEY(TENANT_ID, PK2 ROW_TIMESTAMP, PK3)) MULTI_TENANT = true, SALT_BUCKETS = 8");
+ }
+ ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ conn.createStatement().execute("CREATE INDEX " + baseTableIdx + " ON " + baseTable + " (PK2, KV3) INCLUDE (KV1)");
+ }
+ ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ conn.createStatement().execute("CREATE VIEW " + globalView + " AS SELECT * FROM " + baseTable + " WHERE KV1 = 'KV1'");
+ }
+ String tenantId = "tenant1";
+ ts = nextTimestamp();
+ try (Connection conn = getTenantConnection(tenantId, ts)) {
+ conn.createStatement().execute("CREATE VIEW " + tenantView + " AS SELECT * FROM " + baseTable);
+ }
+ ts = nextTimestamp();
+ try (Connection conn = getTenantConnection(tenantId, ts)) {
+ conn.createStatement().execute("CREATE INDEX " + tenantViewIdx + " ON " + tenantView + " (PK2, KV2) INCLUDE (KV1)");
+ }
+
+ // upsert data into base table without specifying the row timestamp column PK2
+ long upsertedTs = 5;
+ try (Connection conn = getConnection(upsertedTs)) {
+ // Upsert select in the same table with the row_timestamp column PK2 not specified. This will end up
+ // creating a new row whose timestamp is the SCN of the connection. The same SCN will be used
+ // for the row key too.
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + baseTable + " (TENANT_ID, PK3, KV1, KV2, KV3) VALUES (?, ?, ?, ?, ?)");
+ stmt.setString(1, tenantId);
+ stmt.setInt(2, 3);
+ stmt.setString(3, "KV1");
+ stmt.setString(4, "KV2");
+ stmt.setString(5, "KV3");
+ stmt.executeUpdate();
+ conn.commit();
+ }
+
+ // Verify that we can see data when querying through base table, global view and index on the base table
+ try (Connection conn = getConnection(nextTimestamp())) {
+ // Query the base table
+ PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + baseTable + " WHERE TENANT_ID = ? AND PK2 = ? AND PK3 = ?");
+ stmt.setString(1, tenantId);
+ stmt.setDate(2, new Date(upsertedTs));
+ stmt.setInt(3, 3);
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(tenantId, rs.getString("TENANT_ID"));
+ assertEquals("KV1", rs.getString("KV1"));
+ assertEquals("KV2", rs.getString("KV2"));
+ assertEquals("KV3", rs.getString("KV3"));
+ assertEquals(new Date(upsertedTs), rs.getDate("PK2"));
+ assertFalse(rs.next());
+
+ // Query the globalView
+ stmt = conn.prepareStatement("SELECT * FROM " + globalView + " WHERE TENANT_ID = ? AND PK2 = ? AND PK3 = ?");
+ stmt.setString(1, tenantId);
+ stmt.setDate(2, new Date(upsertedTs));
+ stmt.setInt(3, 3);
+ rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(tenantId, rs.getString("TENANT_ID"));
+ assertEquals("KV1", rs.getString("KV1"));
+ assertEquals("KV2", rs.getString("KV2"));
+ assertEquals("KV3", rs.getString("KV3"));
+ assertEquals(new Date(upsertedTs), rs.getDate("PK2"));
+ assertFalse(rs.next());
+
+ // Query using the index on base table
+ stmt = conn.prepareStatement("SELECT KV1 FROM " + baseTable + " WHERE PK2 = ? AND KV3 = ?");
+ stmt.setDate(1, new Date(upsertedTs));
+ stmt.setString(2, "KV3");
+ rs = stmt.executeQuery();
+ QueryPlan plan = stmt.unwrap(PhoenixStatement.class).getQueryPlan();
+ assertTrue(plan.getTableRef().getTable().getName().getString().equals(baseTableIdx));
+ assertTrue(rs.next());
+ assertEquals("KV1", rs.getString("KV1"));
+ assertFalse(rs.next());
+ }
+
+ // Verify that data can be queried using tenant view and tenant view index
+ try (Connection tenantConn = getTenantConnection(tenantId, nextTimestamp())) {
+ // Query the tenant view
+ PreparedStatement stmt = tenantConn.prepareStatement("SELECT * FROM " + tenantView + " WHERE PK2 = ? AND PK3 = ?");
+ stmt.setDate(1, new Date(upsertedTs));
+ stmt.setInt(2, 3);
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals("KV1", rs.getString("KV1"));
+ assertEquals("KV2", rs.getString("KV2"));
+ assertEquals("KV3", rs.getString("KV3"));
+ assertEquals(new Date(upsertedTs), rs.getDate("PK2"));
+ assertFalse(rs.next());
+
+ // Query using the index on the tenantView
+ //TODO: uncomment the code after PHOENIX-2277 is fixed
+// stmt = tenantConn.prepareStatement("SELECT KV1 FROM " + tenantView + " WHERE PK2 = ? AND KV2 = ?");
+// stmt.setDate(1, new Date(upsertedTs));
+// stmt.setString(2, "KV2");
+// rs = stmt.executeQuery();
+// QueryPlan plan = stmt.unwrap(PhoenixStatement.class).getQueryPlan();
+// assertTrue(plan.getTableRef().getTable().getName().getString().equals(tenantViewIdx));
+// assertTrue(rs.next());
+// assertEquals("KV1", rs.getString("KV1"));
+// assertFalse(rs.next());
+ }
+
+ upsertedTs = nextTimestamp();
+ try (Connection tenantConn = getTenantConnection(tenantId, upsertedTs)) {
+ // Upsert into tenant view where the row_timestamp column PK2 is not specified
+ PreparedStatement stmt = tenantConn.prepareStatement("UPSERT INTO " + tenantView + " (PK3, KV1, KV2, KV3) VALUES (?, ?, ?, ?)");
+ stmt.setInt(1, 33);
+ stmt.setString(2, "KV13");
+ stmt.setString(3, "KV23");
+ stmt.setString(4, "KV33");
+ stmt.executeUpdate();
+ tenantConn.commit();
+ // Upsert into tenant view where the row_timestamp column PK2 is specified
+ stmt = tenantConn.prepareStatement("UPSERT INTO " + tenantView + " (PK2, PK3, KV1, KV2, KV3) VALUES (?, ?, ?, ?, ?)");
+ stmt.setDate(1, new Date(upsertedTs));
+ stmt.setInt(2, 44);
+ stmt.setString(3, "KV14");
+ stmt.setString(4, "KV24");
+ stmt.setString(5, "KV34");
+ stmt.executeUpdate();
+ tenantConn.commit();
+ }
+
+ // Verify that the data upserted using the tenant view can now be queried using base table and the base table index
+ try (Connection conn = getConnection(upsertedTs + 10000)) {
+ // Query the base table
+ PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + baseTable + " WHERE TENANT_ID = ? AND PK2 = ? AND PK3 = ? ");
+ stmt.setString(1, tenantId);
+ stmt.setDate(2, new Date(upsertedTs));
+ stmt.setInt(3, 33);
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(tenantId, rs.getString("TENANT_ID"));
+ assertEquals("KV13", rs.getString("KV1"));
+ assertEquals("KV23", rs.getString("KV2"));
+ assertEquals("KV33", rs.getString("KV3"));
+ assertFalse(rs.next());
+
+ stmt = conn.prepareStatement("SELECT * FROM " + baseTable + " WHERE TENANT_ID = ? AND PK2 = ? AND PK3 = ? ");
+ stmt.setString(1, tenantId);
+ stmt.setDate(2, new Date(upsertedTs));
+ stmt.setInt(3, 44);
+ rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(tenantId, rs.getString("TENANT_ID"));
+ assertEquals("KV14", rs.getString("KV1"));
+ assertEquals("KV24", rs.getString("KV2"));
+ assertEquals("KV34", rs.getString("KV3"));
+ assertFalse(rs.next());
+
+ // Query using the index on base table
+ stmt = conn.prepareStatement("SELECT KV1 FROM " + baseTable + " WHERE (PK2, KV3) IN ((?, ?), (?, ?)) ORDER BY KV1");
+ stmt.setDate(1, new Date(upsertedTs));
+ stmt.setString(2, "KV33");
+ stmt.setDate(3, new Date(upsertedTs));
+ stmt.setString(4, "KV34");
+ rs = stmt.executeQuery();
+ QueryPlan plan = stmt.unwrap(PhoenixStatement.class).getQueryPlan();
+ assertTrue(plan.getTableRef().getTable().getName().getString().equals(baseTableIdx));
+ assertTrue(rs.next());
+ assertEquals("KV13", rs.getString("KV1"));
+ assertTrue(rs.next());
+ assertEquals("KV14", rs.getString("KV1"));
+ assertFalse(rs.next());
+ }
+
+ // Verify that the data upserted using the tenant view can now be queried using tenant view
+ try (Connection tenantConn = getTenantConnection(tenantId, upsertedTs + 10000)) {
+ // Query the base table
+ PreparedStatement stmt = tenantConn.prepareStatement("SELECT * FROM " + tenantView + " WHERE (PK2, PK3) IN ((?, ?), (?, ?)) ORDER BY KV1");
+ stmt.setDate(1, new Date(upsertedTs));
+ stmt.setInt(2, 33);
+ stmt.setDate(3, new Date(upsertedTs));
+ stmt.setInt(4, 44);
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals("KV13", rs.getString("KV1"));
+ assertTrue(rs.next());
+ assertEquals("KV14", rs.getString("KV1"));
+ assertFalse(rs.next());
+
+ //TODO: uncomment the code after PHOENIX-2277 is fixed
+// // Query using the index on the tenantView
+// stmt = tenantConn.prepareStatement("SELECT KV1 FROM " + tenantView + " WHERE (PK2, KV2) IN (?, ?, ?, ?) ORDER BY KV1");
+// stmt.setDate(1, new Date(upsertedTs));
+// stmt.setString(2, "KV23");
+// stmt.setDate(3, new Date(upsertedTs));
+// stmt.setString(4, "KV24");
+// rs = stmt.executeQuery();
+// QueryPlan plan = stmt.unwrap(PhoenixStatement.class).getQueryPlan();
+// assertTrue(plan.getTableRef().getTable().getName().getString().equals(tenantViewIdx));
+// assertTrue(rs.next());
+// assertEquals("KV13", rs.getString("KV1"));
+// assertTrue(rs.next());
+// assertEquals("KV14", rs.getString("KV1"));
+// assertFalse(rs.next());
+ }
+ }
+
+ @Test
+ public void testDisallowNegativeValuesForRowTsColumn() throws Exception {
+ String tableName = "testDisallowNegativeValuesForRowTsColumn".toUpperCase();
+ String tableName2 = "testDisallowNegativeValuesForRowTsColumn2".toUpperCase();
+ long ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ conn.createStatement().execute("CREATE TABLE " + tableName + " (PK1 BIGINT NOT NULL PRIMARY KEY ROW_TIMESTAMP, KV1 VARCHAR)");
+ conn.createStatement().execute("CREATE TABLE " + tableName2 + " (PK1 BIGINT NOT NULL PRIMARY KEY ROW_TIMESTAMP, KV1 VARCHAR)");
+ }
+ ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ long upsertedTs = 100;
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES (?, ?)");
+ stmt.setLong(1, upsertedTs);
+ stmt.setString(2, "KV1");
+ stmt.executeUpdate();
+ conn.commit();
+ }
+ ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName2 + " SELECT (PK1 - 500), KV1 FROM " + tableName);
+ stmt.executeUpdate();
+ fail();
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.ILLEGAL_DATA.getErrorCode(), e.getErrorCode());
+ }
+ }
+
+ private static Connection getConnection(long ts) throws SQLException {
+ Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
+ return DriverManager.getConnection(getUrl(), props);
+ }
+
+ private static Connection getTenantConnection(String tenantId, long ts) throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
+ props.setProperty(TENANT_ID_ATTRIB, tenantId);
+ return DriverManager.getConnection(getUrl(), props);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
index 8e07af5..78dce80 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
@@ -37,9 +37,12 @@ import java.sql.Time;
import java.sql.Timestamp;
import java.util.Properties;
+import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Test;
@@ -644,4 +647,285 @@ public class UpsertValuesIT extends BaseClientManagedTimeIT {
}
}
+ @Test
+ public void testWithUpsertingRowTimestampColSpecified_desc() throws Exception {
+ testWithUpsertingRowTimestampColSpecified(true);
+ }
+
+ @Test
+ public void testWithUpsertingRowTimestampColSpecified_asc() throws Exception {
+ testWithUpsertingRowTimestampColSpecified(false);
+ }
+
+ private void testWithUpsertingRowTimestampColSpecified(boolean desc) throws Exception {
+ String tableName = "testUpsertingRowTimestampCol".toUpperCase();
+ String indexName = "testUpsertingRowTimestampCol_idx".toUpperCase();
+ long ts = nextTimestamp();
+ String sortOrder = desc ? "DESC" : "";
+ try (Connection conn = getConnection(ts)) {
+ conn.createStatement().execute("CREATE TABLE IF NOT EXISTS " + tableName + " (PK1 VARCHAR NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR, KV2 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 "+ sortOrder + " ROW_TIMESTAMP " + ")) ");
+ }
+ ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ conn.createStatement().execute("CREATE INDEX IF NOT EXISTS " + indexName + " ON " + tableName + " (PK2, KV1) INCLUDE (KV2)");
+ }
+ ts = nextTimestamp();
+ long rowTimestamp = ts + 10000;
+ Date rowTimestampDate = new Date(rowTimestamp);
+ try (Connection conn = getConnection(ts)) {
+ // Upsert data with scn set on the connection. However, the timestamp of the put will be the value of the row_timestamp column.
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (PK1, PK2, KV1, KV2) VALUES (?, ?, ?, ?)");
+ stmt.setString(1, "PK1");
+ stmt.setDate(2, rowTimestampDate);
+ stmt.setString(3, "KV1");
+ stmt.setString(4, "KV2");
+ stmt.executeUpdate();
+ conn.commit();
+ }
+ ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ // Verify that the connection with the next time stamp isn't able to see the data inserted above. This
+ // is because the timestamp of the put was rowTimestamp and not connection scn.
+ PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + tableName + " WHERE PK1 = ? AND PK2 = ?");
+ stmt.setString(1, "PK1");
+ stmt.setDate(2, rowTimestampDate);
+ ResultSet rs = stmt.executeQuery();
+ assertFalse(rs.next());
+
+ // Same holds when querying the index table too
+ stmt = conn.prepareStatement("SELECT KV1 FROM " + tableName + " WHERE PK2 = ?");
+ stmt.setDate(1, rowTimestampDate);
+ rs = stmt.executeQuery();
+ assertFalse(rs.next());
+ }
+
+ // Verify now that if the connection is at an SCN beyond the rowtimestamp then we can indeed see the
+ // data that we upserted above.
+ try (Connection conn = getConnection(rowTimestamp + 5)) {
+ PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + tableName + " WHERE PK1 = ? AND PK2 = ?");
+ stmt.setString(1, "PK1");
+ stmt.setDate(2, rowTimestampDate);
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals("PK1", rs.getString("PK1"));
+ assertEquals(rowTimestampDate, rs.getDate("PK2"));
+ assertEquals("KV1", rs.getString("KV1"));
+
+ // Data visible when querying the index table too.
+ stmt = conn.prepareStatement("SELECT KV2 FROM " + tableName + " WHERE PK2 = ? AND KV1 = ?");
+ stmt.setDate(1, rowTimestampDate);
+ stmt.setString(2, "KV1");
+ rs = stmt.executeQuery();
+ QueryPlan plan = stmt.unwrap(PhoenixStatement.class).getQueryPlan();
+ assertTrue(plan.getTableRef().getTable().getName().getString().equals(indexName));
+ assertTrue(rs.next());
+ assertEquals("KV2", rs.getString("KV2"));
+ }
+ }
+
+ @Test
+ public void testAutomaticallySettingRowtimestamp_desc() throws Exception {
+ testAutomaticallySettingRowtimestamp("DESC");
+ }
+
+ @Test
+ public void testAutomaticallySettingRowtimestamp_asc() throws Exception {
+ testAutomaticallySettingRowtimestamp("ASC");
+ }
+
+ private void testAutomaticallySettingRowtimestamp(String sortOrder) throws Exception {
+ String tableName = "testAutomaticallySettingRowtimestamp".toUpperCase();
+ String indexName = "testAutomaticallySettingRowtimestamp_index".toUpperCase();
+ long ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ conn.createStatement().execute("CREATE TABLE IF NOT EXISTS " + tableName + " (PK1 VARCHAR NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR, KV2 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 "+ sortOrder + " ROW_TIMESTAMP " + ")) ");
+ }
+ ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ conn.createStatement().execute("CREATE INDEX IF NOT EXISTS " + indexName + " ON " + tableName + " (PK2, KV1) INCLUDE (KV2)");
+ }
+ ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ // Upsert values where row_timestamp column PK2 is not set and the column names are specified
+ // This should upsert data with the value for PK2 as new Date(ts);
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (PK1, KV1, KV2) VALUES (?, ?, ?)");
+ stmt.setString(1, "PK1");
+ stmt.setString(2, "KV1");
+ stmt.setString(3, "KV2");
+ stmt.executeUpdate();
+ conn.commit();
+ }
+ Date upsertedDate = new Date(ts);
+ ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ // Now query for data that was upserted above. If the row key was generated correctly then we should be able to see
+ // the data in this query.
+ PreparedStatement stmt = conn.prepareStatement("SELECT KV1, KV2 FROM " + tableName + " WHERE PK1 = ? AND PK2 = ?");
+ stmt.setString(1, "PK1");
+ stmt.setDate(2, upsertedDate);
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals("KV1", rs.getString(1));
+ assertEquals("KV2", rs.getString(2));
+ assertFalse(rs.next());
+
+ // Verify now that the data was correctly added to the mutable index too.
+ stmt = conn.prepareStatement("SELECT KV2 FROM " + tableName + " WHERE PK2 = ? AND KV1 = ?");
+ stmt.setDate(1, upsertedDate);
+ stmt.setString(2, "KV1");
+ rs = stmt.executeQuery();
+ QueryPlan plan = stmt.unwrap(PhoenixStatement.class).getQueryPlan();
+ assertTrue(plan.getTableRef().getTable().getName().getString().equals(indexName));
+ assertTrue(rs.next());
+ assertEquals("KV2", rs.getString(1));
+ assertFalse(rs.next());
+ }
+ }
+
+ @Test
+ public void testAutomaticallySettingRowTimestampForImmutableTableAndIndexes_desc() throws Exception {
+ testAutomaticallySettingRowTimestampForImmutableTableAndIndexes("DESC");
+ }
+
+ @Test
+ public void testAutomaticallySettingRowTimestampForImmutableTableAndIndexes_asc() throws Exception {
+ testAutomaticallySettingRowTimestampForImmutableTableAndIndexes("ASC");
+ }
+
+ private void testAutomaticallySettingRowTimestampForImmutableTableAndIndexes(String sortOrder) throws Exception {
+ String tableName = "testSettingRowTimestampForImmutableTableAndIndexes".toUpperCase();
+ String indexName = "testSettingRowTimestampForImmutableTableAndIndexes_index".toUpperCase();
+ long ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ conn.createStatement().execute("CREATE TABLE IF NOT EXISTS " + tableName + " (PK1 VARCHAR NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR, KV2 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 "+ sortOrder + " ROW_TIMESTAMP)) " + " IMMUTABLE_ROWS=true");
+ }
+ ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ conn.createStatement().execute("CREATE INDEX IF NOT EXISTS " + indexName + " ON " + tableName + " (PK2, KV1) INCLUDE (KV2)");
+ }
+ ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ // Upsert values where row_timestamp column PK2 is not set and the column names are specified
+ // This should upsert data with the value for PK2 as new Date(ts);
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (PK1, KV1, KV2) VALUES (?, ?, ?)");
+ stmt.setString(1, "PK1");
+ stmt.setString(2, "KV1");
+ stmt.setString(3, "KV2");
+ stmt.executeUpdate();
+ conn.commit();
+ }
+ Date upsertedDate = new Date(ts);
+ ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ // Now query for data that was upserted above. If the row key was generated correctly then we should be able to see
+ // the data in this query.
+ PreparedStatement stmt = conn.prepareStatement("SELECT KV1, KV2 FROM " + tableName + " WHERE PK1 = ? AND PK2 = ?");
+ stmt.setString(1, "PK1");
+ stmt.setDate(2, upsertedDate);
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals("KV1", rs.getString(1));
+ assertEquals("KV2", rs.getString(2));
+ assertFalse(rs.next());
+
+ // Verify now that the data was correctly added to the mutable index too.
+ stmt = conn.prepareStatement("SELECT KV2 FROM " + tableName + " WHERE PK2 = ? AND KV1 = ?");
+ stmt.setDate(1, upsertedDate);
+ stmt.setString(2, "KV1");
+ rs = stmt.executeQuery();
+ QueryPlan plan = stmt.unwrap(PhoenixStatement.class).getQueryPlan();
+ assertTrue(plan.getTableRef().getTable().getName().getString().equals(indexName));
+ assertTrue(rs.next());
+ assertEquals("KV2", rs.getString(1));
+ assertFalse(rs.next());
+ }
+ }
+
+ @Test
+ public void testComparisonOperatorsOnAscRowTimestampCol() throws Exception {
+ testComparisonOperatorsOnRowTimestampCol("ASC");
+ }
+
+ @Test
+ public void testComparisonOperatorsOnDescRowTimestampCol() throws Exception {
+ testComparisonOperatorsOnRowTimestampCol("DESC");
+ }
+
+ private void testComparisonOperatorsOnRowTimestampCol(String sortOrder) throws Exception {
+ String tableName = ("testComparisonOperatorsOnRowTimestampCol_" + sortOrder).toUpperCase();
+ long ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ conn.createStatement().execute("CREATE TABLE IF NOT EXISTS " + tableName + " (PK1 VARCHAR NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 "+ sortOrder + " ROW_TIMESTAMP)) " + " IMMUTABLE_ROWS=true");
+ }
+ ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ String upsert = "UPSERT INTO " + tableName + " VALUES (?, ?, ?)";
+ PreparedStatement stmt = conn.prepareStatement(upsert);
+ stmt.setString(1, "a");
+ stmt.setDate(2, new Date(10));
+ stmt.setString(3, "KV");
+ stmt.executeUpdate();
+ stmt.setString(1, "b");
+ stmt.setDate(2, new Date(20));
+ stmt.setString(3, "KV");
+ stmt.executeUpdate();
+ stmt.setString(1, "c");
+ stmt.setDate(2, new Date(30));
+ stmt.setString(3, "KV");
+ stmt.executeUpdate();
+ stmt.setString(1, "d");
+ stmt.setDate(2, new Date(40));
+ stmt.setString(3, "KV");
+ stmt.executeUpdate();
+ conn.commit();
+ }
+ ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ assertNumRecords(3, "SELECT count(*) from " + tableName + " WHERE PK2 > ?", conn, new Date(10));
+ assertNumRecords(1, "SELECT count(*) from " + tableName + " WHERE PK2 < ? AND PK2 > ?", conn, new Date(30), new Date(10));
+ assertNumRecords(3, "SELECT count(*) from " + tableName + " WHERE PK2 <= ? AND PK2 >= ?", conn, new Date(30), new Date(10));
+ assertNumRecords(2, "SELECT count(*) from " + tableName + " WHERE PK2 <= ? AND PK2 > ?", conn, new Date(30), new Date(10));
+ assertNumRecords(2, "SELECT count(*) from " + tableName + " WHERE PK2 < ? AND PK2 >= ?", conn, new Date(30), new Date(10));
+ assertNumRecords(0, "SELECT count(*) from " + tableName + " WHERE PK2 < ?", conn, new Date(10));
+ assertNumRecords(4, "SELECT count(*) from " + tableName, conn);
+ }
+ }
+
+ private void assertNumRecords(int count, String sql, Connection conn, Date ... params) throws Exception {
+ PreparedStatement stmt = conn.prepareStatement(sql);
+ int counter = 1;
+ for (Date param : params) {
+ stmt.setDate(counter++, param);
+ }
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(count, rs.getInt(1));
+ }
+
+ @Test
+ public void testDisallowNegativeValuesForRowTsColumn() throws Exception {
+ String tableName = "testDisallowNegativeValuesForRowTsColumn".toUpperCase();
+ long ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ conn.createStatement().execute("CREATE TABLE " + tableName + " (PK1 DATE NOT NULL PRIMARY KEY ROW_TIMESTAMP, KV1 VARCHAR)");
+ }
+ ts = nextTimestamp();
+ try (Connection conn = getConnection(ts)) {
+ Date d = new Date(-100);
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES (?, ?)");
+ stmt.setDate(1, d);
+ stmt.setString(2, "KV1");
+ stmt.executeUpdate();
+ fail();
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.ILLEGAL_DATA.getErrorCode(), e.getErrorCode());
+ }
+ }
+
+ private static Connection getConnection(long ts) throws SQLException {
+ Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+ props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
+ return DriverManager.getConnection(getUrl(), props);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/antlr3/PhoenixSQL.g
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g
index 19286dc..42ac9c4 100644
--- a/phoenix-core/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g
@@ -124,6 +124,7 @@ tokens
REPLACE = 'replace';
LIST = 'list';
JARS='jars';
+ ROW_TIMESTAMP='row_timestamp';
}
@@ -469,16 +470,17 @@ drop_sequence_node returns [DropSequenceStatement ret]
;
pk_constraint returns [PrimaryKeyConstraint ret]
- : COMMA? CONSTRAINT n=identifier PRIMARY KEY LPAREN cols=col_name_with_sort_order_list RPAREN { $ret = factory.primaryKey(n,cols); }
+ : COMMA? CONSTRAINT n=identifier PRIMARY KEY LPAREN cols=col_name_with_sort_order_rowtimestamp_list RPAREN { $ret = factory.primaryKey(n,cols); }
;
-col_name_with_sort_order_list returns [List<Pair<ColumnName, SortOrder>> ret]
-@init{ret = new ArrayList<Pair<ColumnName, SortOrder>>(); }
- : p=col_name_with_sort_order {$ret.add(p);} (COMMA p = col_name_with_sort_order {$ret.add(p);} )*
+col_name_with_sort_order_rowtimestamp_list returns [List<ColumnDefInPkConstraint> ret]
+@init{ret = new ArrayList<ColumnDefInPkConstraint>(); }
+ : p=col_name_with_sort_order_rowtimestamp {$ret.add(p);} (COMMA p = col_name_with_sort_order_rowtimestamp {$ret.add(p);} )*
;
-
-col_name_with_sort_order returns [Pair<ColumnName, SortOrder> ret]
- : f=identifier (order=ASC|order=DESC)? {$ret = Pair.newPair(factory.columnName(f), order == null ? SortOrder.getDefault() : SortOrder.fromDDLValue(order.getText()));}
+
+col_name_with_sort_order_rowtimestamp returns [ColumnDefInPkConstraint ret]
+ : f=identifier (order=ASC|order=DESC)? (rr=ROW_TIMESTAMP)?
+ { $ret = factory.columnDefInPkConstraint(factory.columnName(f), order == null ? SortOrder.getDefault() : SortOrder.fromDDLValue(order.getText()), rr != null); }
;
ik_constraint returns [IndexKeyConstraint ret]
@@ -601,12 +603,13 @@ column_defs returns [List<ColumnDef> ret]
;
column_def returns [ColumnDef ret]
- : c=column_name dt=identifier (LPAREN l=NUMBER (COMMA s=NUMBER)? RPAREN)? ar=ARRAY? (lsq=LSQUARE (a=NUMBER)? RSQUARE)? (nn=NOT? n=NULL)? (pk=PRIMARY KEY (order=ASC|order=DESC)?)?
+ : c=column_name dt=identifier (LPAREN l=NUMBER (COMMA s=NUMBER)? RPAREN)? ar=ARRAY? (lsq=LSQUARE (a=NUMBER)? RSQUARE)? (nn=NOT? n=NULL)? (pk=PRIMARY KEY (order=ASC|order=DESC)? rr=ROW_TIMESTAMP?)?
{ $ret = factory.columnDef(c, dt, ar != null || lsq != null, a == null ? null : Integer.parseInt( a.getText() ), nn!=null ? Boolean.FALSE : n!=null ? Boolean.TRUE : null,
l == null ? null : Integer.parseInt( l.getText() ),
s == null ? null : Integer.parseInt( s.getText() ),
pk != null,
- order == null ? SortOrder.getDefault() : SortOrder.fromDDLValue(order.getText()) ); }
+ order == null ? SortOrder.getDefault() : SortOrder.fromDDLValue(order.getText()),
+ rr != null); }
;
dyn_column_defs returns [List<ColumnDef> ret]
@@ -620,7 +623,8 @@ dyn_column_def returns [ColumnDef ret]
l == null ? null : Integer.parseInt( l.getText() ),
s == null ? null : Integer.parseInt( s.getText() ),
false,
- SortOrder.getDefault()); }
+ SortOrder.getDefault(),
+ false); }
;
dyn_column_name_or_def returns [ColumnDef ret]
@@ -629,7 +633,8 @@ dyn_column_name_or_def returns [ColumnDef ret]
l == null ? null : Integer.parseInt( l.getText() ),
s == null ? null : Integer.parseInt( s.getText() ),
false,
- SortOrder.getDefault()); }
+ SortOrder.getDefault(),
+ false); }
;
subquery_expression returns [ParseNode ret]
http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index ebbfd9c..96588d1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -17,6 +17,8 @@
*/
package org.apache.phoenix.compile;
+import static org.apache.phoenix.execute.MutationState.RowTimestampColInfo.NULL_ROWTIMESTAMP_INFO;
+
import java.sql.ParameterMetaData;
import java.sql.SQLException;
import java.util.Arrays;
@@ -147,11 +149,13 @@ public class DeleteCompiler {
}
table.newKey(ptr, values);
}
- mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter()));
+ // When issuing deletes, we do not care about the row time ranges. Also, if the table had a row timestamp column, then the
+ // row key will already have its value.
+ mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO));
if (indexTableRef != null) {
ImmutableBytesPtr indexPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map
rs.getCurrentRow().getKey(indexPtr);
- indexMutations.put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter()));
+ indexMutations.put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO));
}
if (mutations.size() > maxSize) {
throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize);
@@ -436,7 +440,7 @@ public class DeleteCompiler {
Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator();
Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount());
while (iterator.hasNext()) {
- mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter()));
+ mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO));
}
return new MutationState(tableRef, mutation, 0, maxSize, connection);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index 9845498..dd92b00 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -220,7 +220,7 @@ public class FromCompiler {
Expression sourceExpression = projector.getColumnProjector(column.getPosition()).getExpression();
PColumnImpl projectedColumn = new PColumnImpl(column.getName(), column.getFamilyName(),
sourceExpression.getDataType(), sourceExpression.getMaxLength(), sourceExpression.getScale(), sourceExpression.isNullable(),
- column.getPosition(), sourceExpression.getSortOrder(), column.getArraySize(), column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr());
+ column.getPosition(), sourceExpression.getSortOrder(), column.getArraySize(), column.getViewConstant(), column.isViewReferenced(), column.getExpressionStr(), column.isRowTimestamp());
projectedColumns.add(projectedColumn);
}
PTable t = PTableImpl.makePTable(table, projectedColumns);
@@ -546,7 +546,7 @@ public class FromCompiler {
familyName = PNameFactory.newName(family);
}
allcolumns.add(new PColumnImpl(name, familyName, dynColumn.getDataType(), dynColumn.getMaxLength(),
- dynColumn.getScale(), dynColumn.isNull(), position, dynColumn.getSortOrder(), dynColumn.getArraySize(), null, false, dynColumn.getExpression()));
+ dynColumn.getScale(), dynColumn.isNull(), position, dynColumn.getSortOrder(), dynColumn.getArraySize(), null, false, dynColumn.getExpression(), false));
position++;
}
theTable = PTableImpl.makePTable(theTable, allcolumns);
@@ -644,7 +644,7 @@ public class FromCompiler {
}
PColumnImpl column = new PColumnImpl(PNameFactory.newName(alias),
PNameFactory.newName(QueryConstants.DEFAULT_COLUMN_FAMILY),
- null, 0, 0, true, position++, SortOrder.ASC, null, null, false, null);
+ null, 0, 0, true, position++, SortOrder.ASC, null, null, false, null, false);
columns.add(column);
}
PTable t = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
index 4683320..b09771a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
@@ -78,7 +78,7 @@ public class ListJarsQueryPlan implements QueryPlan {
PColumn column =
new PColumnImpl(PNameFactory.newName("jar_location"), null,
PVarchar.INSTANCE, null, null, false, 0, SortOrder.getDefault(), 0, null,
- false, null);
+ false, null, false);
List<PColumn> columns = new ArrayList<PColumn>();
columns.add(column);
Expression expression =
http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
index fcbeb7e..d75fe38 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
@@ -125,6 +125,7 @@ public class PostDDLCompiler {
return Collections.singletonList(tableRef);
}
+ @Override
public java.util.List<PFunction> getFunctions() {
return Collections.emptyList();
};
@@ -142,10 +143,12 @@ public class PostDDLCompiler {
return new ColumnRef(tableRef, column.getPosition());
}
+ @Override
public PFunction resolveFunction(String functionName) throws SQLException {
throw new UnsupportedOperationException();
};
+ @Override
public boolean hasUDFs() {
return false;
};
http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
index 9f99f1c..f5bb4c4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
@@ -114,8 +114,9 @@ public class PostIndexDDLCompiler {
this.selectQuery = selectQueryBuilder.toString();
updateStmtStr.append(this.selectQuery);
- final PhoenixStatement statement = new PhoenixStatement(connection);
- return statement.compileMutation(updateStmtStr.toString());
+ try (final PhoenixStatement statement = new PhoenixStatement(connection)) {
+ return statement.compileMutation(updateStmtStr.toString());
+ }
}
public List<String> getIndexColumnNames() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
index 2a032a3..4d343f3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
@@ -19,15 +19,19 @@ package org.apache.phoenix.compile;
import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.STARTKEY_OFFSET;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.query.KeyRange;
@@ -35,11 +39,14 @@ import org.apache.phoenix.query.KeyRange.Bound;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.RowKeySchema;
import org.apache.phoenix.schema.SaltingUtil;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.ValueSchema.Field;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.ScanUtil.BytesComparator;
import org.apache.phoenix.util.SchemaUtil;
+import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
@@ -48,20 +55,20 @@ import com.google.common.collect.Lists;
public class ScanRanges {
private static final List<List<KeyRange>> EVERYTHING_RANGES = Collections.<List<KeyRange>>emptyList();
private static final List<List<KeyRange>> NOTHING_RANGES = Collections.<List<KeyRange>>singletonList(Collections.<KeyRange>singletonList(KeyRange.EMPTY_RANGE));
- public static final ScanRanges EVERYTHING = new ScanRanges(null,ScanUtil.SINGLE_COLUMN_SLOT_SPAN,EVERYTHING_RANGES, KeyRange.EVERYTHING_RANGE, KeyRange.EVERYTHING_RANGE, false, false, null);
- public static final ScanRanges NOTHING = new ScanRanges(null,ScanUtil.SINGLE_COLUMN_SLOT_SPAN,NOTHING_RANGES, KeyRange.EMPTY_RANGE, KeyRange.EMPTY_RANGE, false, false, null);
+ public static final ScanRanges EVERYTHING = new ScanRanges(null,ScanUtil.SINGLE_COLUMN_SLOT_SPAN,EVERYTHING_RANGES, KeyRange.EVERYTHING_RANGE, KeyRange.EVERYTHING_RANGE, false, false, null, null);
+ public static final ScanRanges NOTHING = new ScanRanges(null,ScanUtil.SINGLE_COLUMN_SLOT_SPAN,NOTHING_RANGES, KeyRange.EMPTY_RANGE, KeyRange.EMPTY_RANGE, false, false, null, null);
private static final Scan HAS_INTERSECTION = new Scan();
public static ScanRanges createPointLookup(List<KeyRange> keys) {
- return ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true);
+ return ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true, -1);
}
// For testing
public static ScanRanges createSingleSpan(RowKeySchema schema, List<List<KeyRange>> ranges) {
- return create(schema, ranges, ScanUtil.getDefaultSlotSpans(ranges.size()), KeyRange.EVERYTHING_RANGE, null, true);
+ return create(schema, ranges, ScanUtil.getDefaultSlotSpans(ranges.size()), KeyRange.EVERYTHING_RANGE, null, true, -1);
}
- public static ScanRanges create(RowKeySchema schema, List<List<KeyRange>> ranges, int[] slotSpan, KeyRange minMaxRange, Integer nBuckets, boolean useSkipScan) {
+ public static ScanRanges create(RowKeySchema schema, List<List<KeyRange>> ranges, int[] slotSpan, KeyRange minMaxRange, Integer nBuckets, boolean useSkipScan, int rowTimestampColIndex) {
int offset = nBuckets == null ? 0 : SaltingUtil.NUM_SALTING_BYTES;
int nSlots = ranges.size();
if (nSlots == offset && minMaxRange == KeyRange.EVERYTHING_RANGE) {
@@ -69,6 +76,7 @@ public class ScanRanges {
} else if (minMaxRange == KeyRange.EMPTY_RANGE || (nSlots == 1 + offset && ranges.get(offset).size() == 1 && ranges.get(offset).get(0) == KeyRange.EMPTY_RANGE)) {
return NOTHING;
}
+ TimeRange rowTimestampRange = getRowTimestampColumnRange(ranges, schema, rowTimestampColIndex);
boolean isPointLookup = isPointLookup(schema, ranges, slotSpan, useSkipScan);
if (isPointLookup) {
// TODO: consider keeping original to use for serialization as it would be smaller?
@@ -111,6 +119,7 @@ public class ScanRanges {
sortedRanges.add(ImmutableList.copyOf(sorted));
}
+
// Don't set minMaxRange for point lookup because it causes issues during intersect
// by going across region boundaries
KeyRange scanRange = KeyRange.EVERYTHING_RANGE;
@@ -139,7 +148,7 @@ public class ScanRanges {
if (scanRange == KeyRange.EMPTY_RANGE) {
return NOTHING;
}
- return new ScanRanges(schema, slotSpan, sortedRanges, scanRange, minMaxRange, useSkipScan, isPointLookup, nBuckets);
+ return new ScanRanges(schema, slotSpan, sortedRanges, scanRange, minMaxRange, useSkipScan, isPointLookup, nBuckets, rowTimestampRange);
}
private SkipScanFilter filter;
@@ -151,13 +160,15 @@ public class ScanRanges {
private final boolean useSkipScanFilter;
private final KeyRange scanRange;
private final KeyRange minMaxRange;
+ private final TimeRange rowTimestampRange;
- private ScanRanges (RowKeySchema schema, int[] slotSpan, List<List<KeyRange>> ranges, KeyRange scanRange, KeyRange minMaxRange, boolean useSkipScanFilter, boolean isPointLookup, Integer bucketNum) {
+ private ScanRanges (RowKeySchema schema, int[] slotSpan, List<List<KeyRange>> ranges, KeyRange scanRange, KeyRange minMaxRange, boolean useSkipScanFilter, boolean isPointLookup, Integer bucketNum, TimeRange rowTimestampRange) {
this.isPointLookup = isPointLookup;
this.isSalted = bucketNum != null;
this.useSkipScanFilter = useSkipScanFilter;
this.scanRange = scanRange;
this.minMaxRange = minMaxRange;
+ this.rowTimestampRange = rowTimestampRange;
// Only blow out the bucket values if we're using the skip scan. We need all the
// bucket values in this case because we use intersect against a key that may have
@@ -602,4 +613,89 @@ public class ScanRanges {
return false;
}
+
+ private static TimeRange getRowTimestampColumnRange(List<List<KeyRange>> ranges, RowKeySchema schema, int rowTimestampColPos) {
+ try {
+ if (rowTimestampColPos != -1) {
+ if (ranges != null && ranges.size() > rowTimestampColPos) {
+ List<KeyRange> rowTimestampColRange = ranges.get(rowTimestampColPos);
+ List<KeyRange> sortedRange = new ArrayList<>(rowTimestampColRange);
+ Collections.sort(sortedRange, KeyRange.COMPARATOR);
+ //ranges.set(rowTimestampColPos, sortedRange); //TODO: do I really need to do this?
+ Field f = schema.getField(rowTimestampColPos);
+ SortOrder order = f.getSortOrder();
+ KeyRange lowestRange = rowTimestampColRange.get(0);
+ KeyRange highestRange = rowTimestampColRange.get(rowTimestampColRange.size() - 1);
+ if (order == SortOrder.DESC) {
+ return getDescTimeRange(lowestRange, highestRange, f);
+ }
+ return getAscTimeRange( lowestRange, highestRange, f);
+ }
+ }
+ } catch (IOException e) {
+ Throwables.propagate(e);
+ }
+ return null;
+ }
+
+ private static TimeRange getAscTimeRange(KeyRange lowestRange, KeyRange highestRange, Field f)
+ throws IOException {
+ long low;
+ long high;
+ if (lowestRange.lowerUnbound()) {
+ low = 0;
+ } else {
+ long lowerRange = f.getDataType().getCodec().decodeLong(lowestRange.getLowerRange(), 0, SortOrder.ASC);
+ low = lowestRange.isLowerInclusive() ? lowerRange : safelyIncrement(lowerRange);
+ }
+ if (highestRange.upperUnbound()) {
+ high = HConstants.LATEST_TIMESTAMP;
+ } else {
+ long upperRange = f.getDataType().getCodec().decodeLong(highestRange.getUpperRange(), 0, SortOrder.ASC);
+ if (highestRange.isUpperInclusive()) {
+ high = safelyIncrement(upperRange);
+ } else {
+ high = upperRange;
+ }
+ }
+ return new TimeRange(low, high);
+ }
+
+ public static TimeRange getDescTimeRange(KeyRange lowestKeyRange, KeyRange highestKeyRange, Field f) throws IOException {
+ boolean lowerUnbound = lowestKeyRange.lowerUnbound();
+ boolean lowerInclusive = lowestKeyRange.isLowerInclusive();
+ boolean upperUnbound = highestKeyRange.upperUnbound();
+ boolean upperInclusive = highestKeyRange.isUpperInclusive();
+
+ long low = lowerUnbound ? -1 : f.getDataType().getCodec().decodeLong(lowestKeyRange.getLowerRange(), 0, SortOrder.DESC);
+ long high = upperUnbound ? -1 : f.getDataType().getCodec().decodeLong(highestKeyRange.getUpperRange(), 0, SortOrder.DESC);
+ long newHigh;
+ long newLow;
+ if (!lowerUnbound && !upperUnbound) {
+ newHigh = lowerInclusive ? safelyIncrement(low) : low;
+ newLow = upperInclusive ? high : safelyIncrement(high);
+ return new TimeRange(newLow, newHigh);
+ } else if (!lowerUnbound && upperUnbound) {
+ newHigh = lowerInclusive ? safelyIncrement(low) : low;
+ newLow = 0;
+ return new TimeRange(newLow, newHigh);
+ } else if (lowerUnbound && !upperUnbound) {
+ newLow = upperInclusive ? high : safelyIncrement(high);
+ newHigh = HConstants.LATEST_TIMESTAMP;
+ return new TimeRange(newLow, newHigh);
+ } else {
+ newLow = 0;
+ newHigh = HConstants.LATEST_TIMESTAMP;
+ return new TimeRange(newLow, newHigh);
+ }
+ }
+
+ private static long safelyIncrement(long value) {
+ return value < HConstants.LATEST_TIMESTAMP ? (value + 1) : HConstants.LATEST_TIMESTAMP;
+ }
+
+ public TimeRange getRowTimestampRange() {
+ return rowTimestampRange;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index 52bb7f2..312de45 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -28,7 +28,6 @@ import java.util.TimeZone;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
@@ -80,8 +79,6 @@ public class StatementContext {
private TableRef currentTable;
private List<Pair<byte[], byte[]>> whereConditionColumns;
- private TimeRange scanTimeRange = null;
-
private Map<SelectStatement, Object> subqueryResults;
private final ReadMetricQueue readMetricsQueue;
private final OverAllQueryMetrics overAllQueryMetrics;
@@ -286,14 +283,6 @@ public class StatementContext {
return whereConditionColumns;
}
- public void setScanTimeRange(TimeRange value){
- this.scanTimeRange = value;
- }
-
- public TimeRange getScanTimeRange() {
- return this.scanTimeRange;
- }
-
public boolean isSubqueryResultAvailable(SelectStatement select) {
return subqueryResults.containsKey(select);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
index d8616b9..ba95d9b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
@@ -77,7 +77,7 @@ public class TraceQueryPlan implements QueryPlan {
PColumn column =
new PColumnImpl(PNameFactory.newName(MetricInfo.TRACE.columnName), null,
PLong.INSTANCE, null, null, false, 0, SortOrder.getDefault(), 0, null,
- false, null);
+ false, null, false);
List<PColumn> columns = new ArrayList<PColumn>();
columns.add(column);
Expression expression =
http://git-wip-us.apache.org/repos/asf/phoenix/blob/72e7ccd1/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
index 942e244..afca97a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java
@@ -75,7 +75,7 @@ public class UnionCompiler {
String name = selectNodes == null ? colProj.getName() : selectNodes.get(i).getAlias();
PColumnImpl projectedColumn = new PColumnImpl(PNameFactory.newName(name), UNION_FAMILY_NAME,
sourceExpression.getDataType(), sourceExpression.getMaxLength(), sourceExpression.getScale(), sourceExpression.isNullable(),
- i, sourceExpression.getSortOrder(), 500, null, false, sourceExpression.toString());
+ i, sourceExpression.getSortOrder(), 500, null, false, sourceExpression.toString(), false);
projectedColumns.add(projectedColumn);
}
Long scn = statement.getConnection().getSCN();