You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ch...@apache.org on 2020/03/28 05:06:13 UTC
[phoenix] branch master updated: PHOENIX-5802: Connection leaks in
UPSERT SELECT/DELETE paths due to MutatingParallelIteratorFactory iterator
not being closed
This is an automated email from the ASF dual-hosted git repository.
chinmayskulkarni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new affa9e8 PHOENIX-5802: Connection leaks in UPSERT SELECT/DELETE paths due to MutatingParallelIteratorFactory iterator not being closed
affa9e8 is described below
commit affa9e889efcc2ad7dac009a0d294b09447d281e
Author: Chinmay Kulkarni <ch...@gmail.com>
AuthorDate: Fri Mar 27 17:59:39 2020 -0700
PHOENIX-5802: Connection leaks in UPSERT SELECT/DELETE paths due to MutatingParallelIteratorFactory iterator not being closed
---
.../org/apache/phoenix/end2end/UpsertSelectIT.java | 2119 +++++++++++---------
.../compile/MutatingParallelIteratorFactory.java | 120 +-
.../org/apache/phoenix/compile/UpsertCompiler.java | 32 +-
.../phoenix/iterate/BaseResultIterators.java | 1 -
4 files changed, 1279 insertions(+), 993 deletions(-)
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
index 41e2d3c..6f0f877 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
@@ -17,6 +17,7 @@
*/
package org.apache.phoenix.end2end;
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_OPEN_PHOENIX_CONNECTIONS;
import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
import static org.apache.phoenix.util.PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB;
import static org.apache.phoenix.util.TestUtil.A_VALUE;
@@ -42,6 +43,7 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import java.sql.Timestamp;
import java.util.Properties;
@@ -49,6 +51,7 @@ import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.monitoring.GlobalMetric;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.types.PInteger;
@@ -57,6 +60,9 @@ import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.TestUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -70,7 +76,22 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
this.allowServerSideMutations = allowServerSideMutations;
}
- @Parameters(name="UpsertSelecttIT_allowServerSideMutations={0}") // name is used by failsafe as file name in reports
+ @Before
+ public void setup() {
+ assertTrue(PhoenixRuntime.areGlobalClientMetricsBeingCollected());
+ for (GlobalMetric m : PhoenixRuntime.getGlobalPhoenixClientMetrics()) {
+ m.reset();
+ }
+ }
+
+ @After
+ public void assertNoConnLeak() {
+ assertTrue(PhoenixRuntime.areGlobalClientMetricsBeingCollected());
+ assertEquals(0, GLOBAL_OPEN_PHOENIX_CONNECTIONS.getMetric().getValue());
+ }
+
+ // name is used by failsafe as file name in reports
+ @Parameters(name="UpsertSelecttIT_allowServerSideMutations={0}")
public static synchronized Object[] data() {
return new Object[] {"true", "false"};
}
@@ -81,17 +102,17 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
}
@Test
- public void testUpsertSelecWithIndex() throws Exception {
+ public void testUpsertSelectWithIndex() throws Exception {
testUpsertSelect(true, false);
}
@Test
- public void testUpsertSelecWithIndexWithSalt() throws Exception {
+ public void testUpsertSelectWithIndexWithSalt() throws Exception {
testUpsertSelect(true, true);
}
@Test
- public void testUpsertSelecWithNoIndexWithSalt() throws Exception {
+ public void testUpsertSelectWithNoIndexWithSalt() throws Exception {
testUpsertSelect(false, true);
}
@@ -101,145 +122,163 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
allowServerSideMutations);
- String aTable = initATableValues(tenantId, saltTable ? null : splits, null, null, getUrl(), saltTable ? "salt_buckets = 2" : null);
+ String aTable = initATableValues(tenantId, saltTable ? null : splits, null,
+ null, getUrl(), saltTable ? "salt_buckets = 2" : null);
String customEntityTable = generateUniqueName();
- Connection conn = DriverManager.getConnection(getUrl(), props);
- String ddl = "create table " + customEntityTable +
- " (organization_id char(15) not null, \n" +
- " key_prefix char(3) not null,\n" +
- " custom_entity_data_id char(12) not null,\n" +
- " created_by varchar,\n" +
- " created_date date,\n" +
- " currency_iso_code char(3),\n" +
- " deleted char(1),\n" +
- " division decimal(31,10),\n" +
- " last_activity date,\n" +
- " last_update date,\n" +
- " last_update_by varchar,\n" +
- " name varchar(240),\n" +
- " owner varchar,\n" +
- " record_type_id char(15),\n" +
- " setup_owner varchar,\n" +
- " system_modstamp date,\n" +
- " b.val0 varchar,\n" +
- " b.val1 varchar,\n" +
- " b.val2 varchar,\n" +
- " b.val3 varchar,\n" +
- " b.val4 varchar,\n" +
- " b.val5 varchar,\n" +
- " b.val6 varchar,\n" +
- " b.val7 varchar,\n" +
- " b.val8 varchar,\n" +
- " b.val9 varchar\n" +
- " CONSTRAINT pk PRIMARY KEY (organization_id, key_prefix, custom_entity_data_id)) " + (saltTable ? "salt_buckets = 2" : "");
- conn.createStatement().execute(ddl);
- conn.close();
-
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ String ddl = "create table " + customEntityTable +
+ " (organization_id char(15) not null, \n" +
+ " key_prefix char(3) not null,\n" +
+ " custom_entity_data_id char(12) not null,\n" +
+ " created_by varchar,\n" +
+ " created_date date,\n" +
+ " currency_iso_code char(3),\n" +
+ " deleted char(1),\n" +
+ " division decimal(31,10),\n" +
+ " last_activity date,\n" +
+ " last_update date,\n" +
+ " last_update_by varchar,\n" +
+ " name varchar(240),\n" +
+ " owner varchar,\n" +
+ " record_type_id char(15),\n" +
+ " setup_owner varchar,\n" +
+ " system_modstamp date,\n" +
+ " b.val0 varchar,\n" +
+ " b.val1 varchar,\n" +
+ " b.val2 varchar,\n" +
+ " b.val3 varchar,\n" +
+ " b.val4 varchar,\n" +
+ " b.val5 varchar,\n" +
+ " b.val6 varchar,\n" +
+ " b.val7 varchar,\n" +
+ " b.val8 varchar,\n" +
+ " b.val9 varchar\n" +
+ " CONSTRAINT pk PRIMARY KEY " +
+ "(organization_id, key_prefix, custom_entity_data_id)) " +
+ (saltTable ? "salt_buckets = 2" : "");
+ stmt.execute(ddl);
+ }
+
String indexName = generateUniqueName();
if (createIndex) {
- conn = DriverManager.getConnection(getUrl(), props);
- conn.createStatement().execute("CREATE INDEX IF NOT EXISTS " + indexName + " ON " + aTable + "(a_string)" );
- conn.close();
- }
- PreparedStatement upsertStmt;
- props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(3)); // Trigger multiple batches
- conn = DriverManager.getConnection(getUrl(), props);
- conn.setAutoCommit(true);
- String upsert = "UPSERT INTO " + customEntityTable + "(custom_entity_data_id, key_prefix, organization_id, created_by) " +
- "SELECT substr(entity_id, 4), substr(entity_id, 1, 3), organization_id, a_string FROM " + aTable + " WHERE ?=a_string";
- if (createIndex) { // Confirm index is used
- upsertStmt = conn.prepareStatement("EXPLAIN " + upsert);
- upsertStmt.setString(1, tenantId);
- ResultSet ers = upsertStmt.executeQuery();
- assertTrue(ers.next());
- String explainPlan = QueryUtil.getExplainPlan(ers);
- assertTrue(explainPlan.contains(" SCAN OVER " + indexName));
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("CREATE INDEX IF NOT EXISTS " + indexName +
+ " ON " + aTable + "(a_string)" );
+ }
}
-
- upsertStmt = conn.prepareStatement(upsert);
- upsertStmt.setString(1, A_VALUE);
- int rowsInserted = upsertStmt.executeUpdate();
- assertEquals(4, rowsInserted);
- conn.commit();
- conn.close();
-
- String query = "SELECT key_prefix, substr(custom_entity_data_id, 1, 1), created_by FROM " + customEntityTable + " WHERE organization_id = ? ";
- conn = DriverManager.getConnection(getUrl(), props);
- PreparedStatement statement = conn.prepareStatement(query);
- statement.setString(1, tenantId);
- ResultSet rs = statement.executeQuery();
-
- assertTrue (rs.next());
- assertEquals("00A", rs.getString(1));
- assertEquals("1", rs.getString(2));
- assertEquals(A_VALUE, rs.getString(3));
-
- assertTrue (rs.next());
- assertEquals("00A", rs.getString(1));
- assertEquals("2", rs.getString(2));
- assertEquals(A_VALUE, rs.getString(3));
-
- assertTrue (rs.next());
- assertEquals("00A", rs.getString(1));
- assertEquals("3", rs.getString(2));
- assertEquals(A_VALUE, rs.getString(3));
-
- assertTrue (rs.next());
- assertEquals("00A", rs.getString(1));
- assertEquals("4", rs.getString(2));
- assertEquals(A_VALUE, rs.getString(3));
+ // Trigger multiple batches
+ props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, Integer.toString(3));
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ String upsert = "UPSERT INTO " + customEntityTable +
+ "(custom_entity_data_id, key_prefix, organization_id, created_by) " +
+ "SELECT substr(entity_id, 4), substr(entity_id, 1, 3), organization_id, " +
+ "a_string FROM " + aTable + " WHERE ?=a_string";
+ if (createIndex) { // Confirm index is used
+ try (PreparedStatement upsertStmt =
+ conn.prepareStatement("EXPLAIN " + upsert)) {
+ upsertStmt.setString(1, tenantId);
+ ResultSet ers = upsertStmt.executeQuery();
+ assertTrue(ers.next());
+ String explainPlan = QueryUtil.getExplainPlan(ers);
+ assertTrue(explainPlan.contains(" SCAN OVER " + indexName));
+ }
+ }
+
+ try (PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+ upsertStmt.setString(1, A_VALUE);
+ int rowsInserted = upsertStmt.executeUpdate();
+ assertEquals(4, rowsInserted);
+ }
+ conn.commit();
+ }
+
+ String query = "SELECT key_prefix, substr(custom_entity_data_id, 1, 1), created_by FROM " +
+ customEntityTable + " WHERE organization_id = ? ";
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement statement = conn.prepareStatement(query)) {
+ statement.setString(1, tenantId);
+ ResultSet rs = statement.executeQuery();
+
+ assertTrue (rs.next());
+ assertEquals("00A", rs.getString(1));
+ assertEquals("1", rs.getString(2));
+ assertEquals(A_VALUE, rs.getString(3));
+
+ assertTrue (rs.next());
+ assertEquals("00A", rs.getString(1));
+ assertEquals("2", rs.getString(2));
+ assertEquals(A_VALUE, rs.getString(3));
+
+ assertTrue (rs.next());
+ assertEquals("00A", rs.getString(1));
+ assertEquals("3", rs.getString(2));
+ assertEquals(A_VALUE, rs.getString(3));
+
+ assertTrue (rs.next());
+ assertEquals("00A", rs.getString(1));
+ assertEquals("4", rs.getString(2));
+ assertEquals(A_VALUE, rs.getString(3));
- assertFalse(rs.next());
- conn.close();
+ assertFalse(rs.next());
+ }
// Test UPSERT through coprocessor
- conn = DriverManager.getConnection(getUrl(), props);
- conn.setAutoCommit(true);
- upsert = "UPSERT INTO " + customEntityTable + "(custom_entity_data_id, key_prefix, organization_id, last_update_by, division) " +
- "SELECT custom_entity_data_id, key_prefix, organization_id, created_by, 1.0 FROM " + customEntityTable + " WHERE organization_id = ? and created_by >= 'a'";
-
- upsertStmt = conn.prepareStatement(upsert);
- upsertStmt.setString(1, tenantId);
- assertEquals(4, upsertStmt.executeUpdate());
- conn.commit();
-
- query = "SELECT key_prefix, substr(custom_entity_data_id, 1, 1), created_by, last_update_by, division FROM " + customEntityTable + " WHERE organization_id = ?";
- conn = DriverManager.getConnection(getUrl(), props);
- statement = conn.prepareStatement(query);
- statement.setString(1, tenantId);
- rs = statement.executeQuery();
-
- assertTrue (rs.next());
- assertEquals("00A", rs.getString(1));
- assertEquals("1", rs.getString(2));
- assertEquals(A_VALUE, rs.getString(3));
- assertEquals(A_VALUE, rs.getString(4));
- assertTrue(BigDecimal.valueOf(1.0).compareTo(rs.getBigDecimal(5)) == 0);
-
- assertTrue (rs.next());
- assertEquals("00A", rs.getString(1));
- assertEquals("2", rs.getString(2));
- assertEquals(A_VALUE, rs.getString(3));
- assertEquals(A_VALUE, rs.getString(4));
- assertTrue(BigDecimal.valueOf(1.0).compareTo(rs.getBigDecimal(5)) == 0);
-
- assertTrue (rs.next());
- assertEquals("00A", rs.getString(1));
- assertEquals("3", rs.getString(2));
- assertEquals(A_VALUE, rs.getString(3));
- assertEquals(A_VALUE, rs.getString(4));
- assertTrue(BigDecimal.valueOf(1.0).compareTo(rs.getBigDecimal(5)) == 0);
-
- assertTrue (rs.next());
- assertEquals("00A", rs.getString(1));
- assertEquals("4", rs.getString(2));
- assertEquals(A_VALUE, rs.getString(3));
- assertEquals(A_VALUE, rs.getString(4));
- assertTrue(BigDecimal.valueOf(1.0).compareTo(rs.getBigDecimal(5)) == 0);
-
- assertFalse(rs.next());
- conn.close();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ String upsert = "UPSERT INTO " + customEntityTable +
+ "(custom_entity_data_id, key_prefix, organization_id, last_update_by, " +
+ "division) SELECT custom_entity_data_id, key_prefix, organization_id, " +
+ "created_by, 1.0 FROM " + customEntityTable + " WHERE organization_id = ?" +
+ " and created_by >= 'a'";
+
+ try (PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+ upsertStmt.setString(1, tenantId);
+ assertEquals(4, upsertStmt.executeUpdate());
+ }
+ conn.commit();
+ }
+
+ query = "SELECT key_prefix, substr(custom_entity_data_id, 1, 1), created_by, " +
+ "last_update_by, division FROM " + customEntityTable + " WHERE organization_id = ?";
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement statement = conn.prepareStatement(query)) {
+ statement.setString(1, tenantId);
+ ResultSet rs = statement.executeQuery();
+
+ assertTrue (rs.next());
+ assertEquals("00A", rs.getString(1));
+ assertEquals("1", rs.getString(2));
+ assertEquals(A_VALUE, rs.getString(3));
+ assertEquals(A_VALUE, rs.getString(4));
+ assertTrue(BigDecimal.valueOf(1.0).compareTo(rs.getBigDecimal(5)) == 0);
+
+ assertTrue (rs.next());
+ assertEquals("00A", rs.getString(1));
+ assertEquals("2", rs.getString(2));
+ assertEquals(A_VALUE, rs.getString(3));
+ assertEquals(A_VALUE, rs.getString(4));
+ assertTrue(BigDecimal.valueOf(1.0).compareTo(rs.getBigDecimal(5)) == 0);
+
+ assertTrue (rs.next());
+ assertEquals("00A", rs.getString(1));
+ assertEquals("3", rs.getString(2));
+ assertEquals(A_VALUE, rs.getString(3));
+ assertEquals(A_VALUE, rs.getString(4));
+ assertTrue(BigDecimal.valueOf(1.0).compareTo(rs.getBigDecimal(5)) == 0);
+
+ assertTrue (rs.next());
+ assertEquals("00A", rs.getString(1));
+ assertEquals("4", rs.getString(2));
+ assertEquals(A_VALUE, rs.getString(3));
+ assertEquals(A_VALUE, rs.getString(4));
+ assertTrue(BigDecimal.valueOf(1.0).compareTo(rs.getBigDecimal(5)) == 0);
+
+ assertFalse(rs.next());
+ }
}
// TODO: more tests - nullable fixed length last PK column
@@ -252,158 +291,165 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
Properties props = new Properties();
props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
allowServerSideMutations);
- Connection conn = DriverManager.getConnection(getUrl(), props);
- conn.setAutoCommit(false);
- String upsert = "UPSERT INTO " + ptsdbTable + "(\"DATE\", val, host) " +
- "SELECT current_date(), x_integer+2, entity_id FROM " + aTable + " WHERE a_integer >= ?";
- PreparedStatement upsertStmt = conn.prepareStatement(upsert);
- upsertStmt.setInt(1, 6);
- int rowsInserted = upsertStmt.executeUpdate();
- assertEquals(4, rowsInserted);
- conn.commit();
- conn.close();
-
+ String upsert;
+ ResultSet rs;
+ int rowsInserted;
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(false);
+ upsert = "UPSERT INTO " + ptsdbTable + "(\"DATE\", val, host) " +
+ "SELECT current_date(), x_integer+2, entity_id FROM " + aTable +
+ " WHERE a_integer >= ?";
+ try (PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+ upsertStmt.setInt(1, 6);
+ rowsInserted = upsertStmt.executeUpdate();
+ assertEquals(4, rowsInserted);
+ }
+ conn.commit();
+ }
+
String query = "SELECT inst,host,\"DATE\",val FROM " + ptsdbTable;
- conn = DriverManager.getConnection(getUrl(), props);
- PreparedStatement statement = conn.prepareStatement(query);
- ResultSet rs = statement.executeQuery();
-
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement statement = conn.prepareStatement(query)) {
+ rs = statement.executeQuery();
+
+ Date now = new Date(EnvironmentEdgeManager.currentTimeMillis());
+ assertTrue (rs.next());
+ assertEquals(null, rs.getString(1));
+ assertEquals(ROW6, rs.getString(2));
+ assertTrue(rs.getDate(3).before(now) );
+ assertEquals(null, rs.getBigDecimal(4));
+
+ assertTrue (rs.next());
+ assertEquals(null, rs.getString(1));
+ assertEquals(ROW7, rs.getString(2));
+ assertTrue(rs.getDate(3).before(now) );
+ assertTrue(BigDecimal.valueOf(7).compareTo(rs.getBigDecimal(4)) == 0);
+
+ assertTrue (rs.next());
+ assertEquals(null, rs.getString(1));
+ assertEquals(ROW8, rs.getString(2));
+ assertTrue(rs.getDate(3).before(now) );
+ assertTrue(BigDecimal.valueOf(6).compareTo(rs.getBigDecimal(4)) == 0);
+
+ assertTrue (rs.next());
+ assertEquals(null, rs.getString(1));
+ assertEquals(ROW9, rs.getString(2));
+ assertTrue(rs.getDate(3).before(now) );
+ assertTrue(BigDecimal.valueOf(5).compareTo(rs.getBigDecimal(4)) == 0);
+
+ assertFalse(rs.next());
+ }
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ upsert = "UPSERT INTO " + ptsdbTable + "(\"DATE\", val, inst) " +
+ "SELECT \"DATE\"+1, val*10, host FROM " + ptsdbTable;
+ try (PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+ rowsInserted = upsertStmt.executeUpdate();
+ assertEquals(4, rowsInserted);
+ }
+ conn.commit();
+ }
Date now = new Date(EnvironmentEdgeManager.currentTimeMillis());
- assertTrue (rs.next());
- assertEquals(null, rs.getString(1));
- assertEquals(ROW6, rs.getString(2));
- assertTrue(rs.getDate(3).before(now) );
- assertEquals(null, rs.getBigDecimal(4));
-
- assertTrue (rs.next());
- assertEquals(null, rs.getString(1));
- assertEquals(ROW7, rs.getString(2));
- assertTrue(rs.getDate(3).before(now) );
- assertTrue(BigDecimal.valueOf(7).compareTo(rs.getBigDecimal(4)) == 0);
-
- assertTrue (rs.next());
- assertEquals(null, rs.getString(1));
- assertEquals(ROW8, rs.getString(2));
- assertTrue(rs.getDate(3).before(now) );
- assertTrue(BigDecimal.valueOf(6).compareTo(rs.getBigDecimal(4)) == 0);
-
- assertTrue (rs.next());
- assertEquals(null, rs.getString(1));
- assertEquals(ROW9, rs.getString(2));
- assertTrue(rs.getDate(3).before(now) );
- assertTrue(BigDecimal.valueOf(5).compareTo(rs.getBigDecimal(4)) == 0);
-
- assertFalse(rs.next());
- conn.close();
-
- conn = DriverManager.getConnection(getUrl(), props);
- conn.setAutoCommit(true);
- upsert = "UPSERT INTO " + ptsdbTable + "(\"DATE\", val, inst) " +
- "SELECT \"DATE\"+1, val*10, host FROM " + ptsdbTable;
- upsertStmt = conn.prepareStatement(upsert);
- rowsInserted = upsertStmt.executeUpdate();
- assertEquals(4, rowsInserted);
- conn.commit();
- conn.close();
-
Date then = new Date(now.getTime() + QueryConstants.MILLIS_IN_DAY);
query = "SELECT host,inst, \"DATE\",val FROM " + ptsdbTable + " where inst is not null";
- conn = DriverManager.getConnection(getUrl(), props);
- statement = conn.prepareStatement(query);
-
- rs = statement.executeQuery();
- assertTrue (rs.next());
- assertEquals(null, rs.getString(1));
- assertEquals(ROW6, rs.getString(2));
- assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
- assertEquals(null, rs.getBigDecimal(4));
-
- assertTrue (rs.next());
- assertEquals(null, rs.getString(1));
- assertEquals(ROW7, rs.getString(2));
- assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
- assertTrue(BigDecimal.valueOf(70).compareTo(rs.getBigDecimal(4)) == 0);
-
- assertTrue (rs.next());
- assertEquals(null, rs.getString(1));
- assertEquals(ROW8, rs.getString(2));
- assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
- assertTrue(BigDecimal.valueOf(60).compareTo(rs.getBigDecimal(4)) == 0);
-
- assertTrue (rs.next());
- assertEquals(null, rs.getString(1));
- assertEquals(ROW9, rs.getString(2));
- assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
- assertTrue(BigDecimal.valueOf(50).compareTo(rs.getBigDecimal(4)) == 0);
-
- assertFalse(rs.next());
- conn.close();
-
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement statement = conn.prepareStatement(query)) {
+ rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(null, rs.getString(1));
+ assertEquals(ROW6, rs.getString(2));
+ assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
+ assertEquals(null, rs.getBigDecimal(4));
+
+ assertTrue (rs.next());
+ assertEquals(null, rs.getString(1));
+ assertEquals(ROW7, rs.getString(2));
+ assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
+ assertTrue(BigDecimal.valueOf(70).compareTo(rs.getBigDecimal(4)) == 0);
+
+ assertTrue (rs.next());
+ assertEquals(null, rs.getString(1));
+ assertEquals(ROW8, rs.getString(2));
+ assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
+ assertTrue(BigDecimal.valueOf(60).compareTo(rs.getBigDecimal(4)) == 0);
+
+ assertTrue (rs.next());
+ assertEquals(null, rs.getString(1));
+ assertEquals(ROW9, rs.getString(2));
+ assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
+ assertTrue(BigDecimal.valueOf(50).compareTo(rs.getBigDecimal(4)) == 0);
+
+ assertFalse(rs.next());
+ }
+
// Should just update all values with the same value, essentially just updating the timestamp
- conn = DriverManager.getConnection(getUrl(), props);
- conn.setAutoCommit(true);
- upsert = "UPSERT INTO " + ptsdbTable + " SELECT * FROM " + ptsdbTable;
- upsertStmt = conn.prepareStatement(upsert);
- rowsInserted = upsertStmt.executeUpdate();
- assertEquals(8, rowsInserted);
- conn.commit();
- conn.close();
-
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ upsert = "UPSERT INTO " + ptsdbTable + " SELECT * FROM " + ptsdbTable;
+ try (PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+ rowsInserted = upsertStmt.executeUpdate();
+ assertEquals(8, rowsInserted);
+ }
+ conn.commit();
+ }
+
query = "SELECT * FROM " + ptsdbTable ;
- conn = DriverManager.getConnection(getUrl(), props);
- statement = conn.prepareStatement(query);
-
- rs = statement.executeQuery();
- assertTrue (rs.next());
- assertEquals(null, rs.getString(1));
- assertEquals(ROW6, rs.getString(2));
- assertTrue(rs.getDate(3).before(now) );
- assertEquals(null, rs.getBigDecimal(4));
-
- assertTrue (rs.next());
- assertEquals(null, rs.getString(1));
- assertEquals(ROW7, rs.getString(2));
- assertTrue(rs.getDate(3).before(now) );
- assertTrue(BigDecimal.valueOf(7).compareTo(rs.getBigDecimal(4)) == 0);
-
- assertTrue (rs.next());
- assertEquals(null, rs.getString(1));
- assertEquals(ROW8, rs.getString(2));
- assertTrue(rs.getDate(3).before(now) );
- assertTrue(BigDecimal.valueOf(6).compareTo(rs.getBigDecimal(4)) == 0);
-
- assertTrue (rs.next());
- assertEquals(null, rs.getString(1));
- assertEquals(ROW9, rs.getString(2));
- assertTrue(rs.getDate(3).before(now) );
- assertTrue(BigDecimal.valueOf(5).compareTo(rs.getBigDecimal(4)) == 0);
-
- assertTrue (rs.next());
- assertEquals(ROW6, rs.getString(1));
- assertEquals(null, rs.getString(2));
- assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
- assertEquals(null, rs.getBigDecimal(4));
-
- assertTrue (rs.next());
- assertEquals(ROW7, rs.getString(1));
- assertEquals(null, rs.getString(2));
- assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
- assertTrue(BigDecimal.valueOf(70).compareTo(rs.getBigDecimal(4)) == 0);
-
- assertTrue (rs.next());
- assertEquals(ROW8, rs.getString(1));
- assertEquals(null, rs.getString(2));
- assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
- assertTrue(BigDecimal.valueOf(60).compareTo(rs.getBigDecimal(4)) == 0);
-
- assertTrue (rs.next());
- assertEquals(ROW9, rs.getString(1));
- assertEquals(null, rs.getString(2));
- assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
- assertTrue(BigDecimal.valueOf(50).compareTo(rs.getBigDecimal(4)) == 0);
-
- assertFalse(rs.next());
- conn.close();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement statement = conn.prepareStatement(query)) {
+ rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(null, rs.getString(1));
+ assertEquals(ROW6, rs.getString(2));
+ assertTrue(rs.getDate(3).before(now) );
+ assertEquals(null, rs.getBigDecimal(4));
+
+ assertTrue (rs.next());
+ assertEquals(null, rs.getString(1));
+ assertEquals(ROW7, rs.getString(2));
+ assertTrue(rs.getDate(3).before(now) );
+ assertTrue(BigDecimal.valueOf(7).compareTo(rs.getBigDecimal(4)) == 0);
+
+ assertTrue (rs.next());
+ assertEquals(null, rs.getString(1));
+ assertEquals(ROW8, rs.getString(2));
+ assertTrue(rs.getDate(3).before(now) );
+ assertTrue(BigDecimal.valueOf(6).compareTo(rs.getBigDecimal(4)) == 0);
+
+ assertTrue (rs.next());
+ assertEquals(null, rs.getString(1));
+ assertEquals(ROW9, rs.getString(2));
+ assertTrue(rs.getDate(3).before(now) );
+ assertTrue(BigDecimal.valueOf(5).compareTo(rs.getBigDecimal(4)) == 0);
+
+ assertTrue (rs.next());
+ assertEquals(ROW6, rs.getString(1));
+ assertEquals(null, rs.getString(2));
+ assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
+ assertEquals(null, rs.getBigDecimal(4));
+
+ assertTrue (rs.next());
+ assertEquals(ROW7, rs.getString(1));
+ assertEquals(null, rs.getString(2));
+ assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
+ assertTrue(BigDecimal.valueOf(70).compareTo(rs.getBigDecimal(4)) == 0);
+
+ assertTrue (rs.next());
+ assertEquals(ROW8, rs.getString(1));
+ assertEquals(null, rs.getString(2));
+ assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
+ assertTrue(BigDecimal.valueOf(60).compareTo(rs.getBigDecimal(4)) == 0);
+
+ assertTrue (rs.next());
+ assertEquals(ROW9, rs.getString(1));
+ assertEquals(null, rs.getString(2));
+ assertTrue(rs.getDate(3).after(now) && rs.getDate(3).before(then));
+ assertTrue(BigDecimal.valueOf(50).compareTo(rs.getBigDecimal(4)) == 0);
+
+ assertFalse(rs.next());
+ }
}
@Test
@@ -424,245 +470,276 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
Properties props = new Properties();
props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
allowServerSideMutations);
- Connection conn = DriverManager.getConnection(getUrl(), props);
- conn.setAutoCommit(autoCommit);
- String upsert = "UPSERT INTO " + ptsdbTable + "(\"DATE\", val, host) " +
- "SELECT current_date(), sum(a_integer), a_string FROM " + aTable + " GROUP BY a_string";
- PreparedStatement upsertStmt = conn.prepareStatement(upsert);
- int rowsInserted = upsertStmt.executeUpdate();
- assertEquals(3, rowsInserted);
- if (!autoCommit) {
- conn.commit();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(autoCommit);
+ String upsert = "UPSERT INTO " + ptsdbTable + "(\"DATE\", val, host) " +
+ "SELECT current_date(), sum(a_integer), a_string FROM " + aTable +
+ " GROUP BY a_string";
+ try (PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+ int rowsInserted = upsertStmt.executeUpdate();
+ assertEquals(3, rowsInserted);
+ }
+ if (!autoCommit) {
+ conn.commit();
+ }
}
- conn.close();
String query = "SELECT inst,host,\"DATE\",val FROM " + ptsdbTable;
- conn = DriverManager.getConnection(getUrl(), props);
- PreparedStatement statement = conn.prepareStatement(query);
- ResultSet rs = statement.executeQuery();
- Date now = new Date(EnvironmentEdgeManager.currentTimeMillis());
-
- assertTrue (rs.next());
- assertEquals(null, rs.getString(1));
- assertEquals(A_VALUE, rs.getString(2));
- assertTrue(rs.getDate(3).before(now) );
- assertTrue(BigDecimal.valueOf(10).compareTo(rs.getBigDecimal(4)) == 0);
-
- assertTrue (rs.next());
- assertEquals(null, rs.getString(1));
- assertEquals(B_VALUE, rs.getString(2));
- assertTrue(rs.getDate(3).before(now) );
- assertTrue(BigDecimal.valueOf(26).compareTo(rs.getBigDecimal(4)) == 0);
-
- assertTrue (rs.next());
- assertEquals(null, rs.getString(1));
- assertEquals(C_VALUE, rs.getString(2));
- assertTrue(rs.getDate(3).before(now) );
- assertTrue(BigDecimal.valueOf(9).compareTo(rs.getBigDecimal(4)) == 0);
- assertFalse(rs.next());
-
- conn = DriverManager.getConnection(getUrl(), props);
- conn.setAutoCommit(true);
- upsert = "UPSERT INTO " + ptsdbTable + "(\"DATE\", val, host, inst) " +
- "SELECT current_date(), max(val), max(host), 'x' FROM " + ptsdbTable;
- upsertStmt = conn.prepareStatement(upsert);
- rowsInserted = upsertStmt.executeUpdate();
- assertEquals(1, rowsInserted);
- if (!autoCommit) {
- conn.commit();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement statement = conn.prepareStatement(query)) {
+ ResultSet rs = statement.executeQuery();
+ Date now = new Date(EnvironmentEdgeManager.currentTimeMillis());
+
+ assertTrue (rs.next());
+ assertEquals(null, rs.getString(1));
+ assertEquals(A_VALUE, rs.getString(2));
+ assertTrue(rs.getDate(3).before(now) );
+ assertTrue(BigDecimal.valueOf(10).compareTo(rs.getBigDecimal(4)) == 0);
+
+ assertTrue (rs.next());
+ assertEquals(null, rs.getString(1));
+ assertEquals(B_VALUE, rs.getString(2));
+ assertTrue(rs.getDate(3).before(now) );
+ assertTrue(BigDecimal.valueOf(26).compareTo(rs.getBigDecimal(4)) == 0);
+
+ assertTrue (rs.next());
+ assertEquals(null, rs.getString(1));
+ assertEquals(C_VALUE, rs.getString(2));
+ assertTrue(rs.getDate(3).before(now) );
+ assertTrue(BigDecimal.valueOf(9).compareTo(rs.getBigDecimal(4)) == 0);
+ assertFalse(rs.next());
}
- conn.close();
-
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ String upsert = "UPSERT INTO " + ptsdbTable + "(\"DATE\", val, host, inst) " +
+ "SELECT current_date(), max(val), max(host), 'x' FROM " + ptsdbTable;
+ try (PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+ int rowsInserted = upsertStmt.executeUpdate();
+ assertEquals(1, rowsInserted);
+ }
+ if (!autoCommit) {
+ conn.commit();
+ }
+ }
+
query = "SELECT inst,host,\"DATE\",val FROM " + ptsdbTable + " WHERE inst='x'";
- conn = DriverManager.getConnection(getUrl(), props);
- statement = conn.prepareStatement(query);
- rs = statement.executeQuery();
- now = new Date(EnvironmentEdgeManager.currentTimeMillis());
-
- assertTrue (rs.next());
- assertEquals("x", rs.getString(1));
- assertEquals(C_VALUE, rs.getString(2));
- assertTrue(rs.getDate(3).before(now) );
- assertTrue(BigDecimal.valueOf(26).compareTo(rs.getBigDecimal(4)) == 0);
- assertFalse(rs.next());
-
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement statement = conn.prepareStatement(query)) {
+ ResultSet rs = statement.executeQuery();
+ Date now = new Date(EnvironmentEdgeManager.currentTimeMillis());
+
+ assertTrue (rs.next());
+ assertEquals("x", rs.getString(1));
+ assertEquals(C_VALUE, rs.getString(2));
+ assertTrue(rs.getDate(3).before(now) );
+ assertTrue(BigDecimal.valueOf(26).compareTo(rs.getBigDecimal(4)) == 0);
+ assertFalse(rs.next());
+ }
}
@Test
public void testUpsertSelectLongToInt() throws Exception {
- byte[][] splits = new byte[][] { PInteger.INSTANCE.toBytes(1), PInteger.INSTANCE.toBytes(2),
- PInteger.INSTANCE.toBytes(3), PInteger.INSTANCE.toBytes(4)};
+ byte[][] splits = new byte[][] { PInteger.INSTANCE.toBytes(1),
+ PInteger.INSTANCE.toBytes(2), PInteger.INSTANCE.toBytes(3),
+ PInteger.INSTANCE.toBytes(4)};
String tableName = generateUniqueName();
ensureTableCreated(getUrl(), tableName, "IntKeyTest", splits, null);
Properties props = new Properties();
props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
allowServerSideMutations);
- Connection conn = DriverManager.getConnection(getUrl(), props);
String upsert = "UPSERT INTO " + tableName + " VALUES(1)";
- PreparedStatement upsertStmt = conn.prepareStatement(upsert);
- int rowsInserted = upsertStmt.executeUpdate();
- assertEquals(1, rowsInserted);
- conn.commit();
- conn.close();
-
- conn = DriverManager.getConnection(getUrl(), props);
+ int rowsInserted;
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+ rowsInserted = upsertStmt.executeUpdate();
+ assertEquals(1, rowsInserted);
+ conn.commit();
+ }
+
upsert = "UPSERT INTO " + tableName + " select i+1 from " + tableName;
- upsertStmt = conn.prepareStatement(upsert);
- rowsInserted = upsertStmt.executeUpdate();
- assertEquals(1, rowsInserted);
- conn.commit();
- conn.close();
-
- conn = DriverManager.getConnection(getUrl(), props);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+ rowsInserted = upsertStmt.executeUpdate();
+ assertEquals(1, rowsInserted);
+ conn.commit();
+ }
+
String select = "SELECT i FROM " + tableName;
- ResultSet rs = conn.createStatement().executeQuery(select);
- assertTrue(rs.next());
- assertEquals(1,rs.getInt(1));
- assertTrue(rs.next());
- assertEquals(2,rs.getInt(1));
- assertFalse(rs.next());
- conn.close();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ ResultSet rs = stmt.executeQuery(select);
+ assertTrue(rs.next());
+ assertEquals(1,rs.getInt(1));
+ assertTrue(rs.next());
+ assertEquals(2,rs.getInt(1));
+ assertFalse(rs.next());
+ }
}
@Test
public void testUpsertSelectRunOnServer() throws Exception {
- byte[][] splits = new byte[][] { PInteger.INSTANCE.toBytes(1), PInteger.INSTANCE.toBytes(2),
- PInteger.INSTANCE.toBytes(3), PInteger.INSTANCE.toBytes(4)};
+ byte[][] splits = new byte[][] { PInteger.INSTANCE.toBytes(1),
+ PInteger.INSTANCE.toBytes(2), PInteger.INSTANCE.toBytes(3),
+ PInteger.INSTANCE.toBytes(4)};
String tableName = generateUniqueName();
- createTestTable(getUrl(), "create table " + tableName + " (i integer not null primary key desc, j integer)", splits, null);
+ createTestTable(getUrl(), "create table " + tableName +
+ " (i integer not null primary key desc, j integer)", splits, null);
Properties props = new Properties();
props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
allowServerSideMutations);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ ResultSet rs;
+ int rowsInserted;
String upsert = "UPSERT INTO " + tableName + " VALUES(1, 1)";
- PreparedStatement upsertStmt = conn.prepareStatement(upsert);
- int rowsInserted = upsertStmt.executeUpdate();
- assertEquals(1, rowsInserted);
- conn.commit();
- conn.close();
-
- conn = DriverManager.getConnection(getUrl(), props);
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+ rowsInserted = upsertStmt.executeUpdate();
+ assertEquals(1, rowsInserted);
+ conn.commit();
+ }
+
String select = "SELECT i,j+1 FROM " + tableName;
- ResultSet rs = conn.createStatement().executeQuery(select);
- assertTrue(rs.next());
- assertEquals(1,rs.getInt(1));
- assertEquals(2,rs.getInt(2));
- assertFalse(rs.next());
- conn.close();
-
- conn = DriverManager.getConnection(getUrl(), props);
- conn.setAutoCommit(true); // Force to run on server side.
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ rs = stmt.executeQuery(select);
+ assertTrue(rs.next());
+ assertEquals(1,rs.getInt(1));
+ assertEquals(2,rs.getInt(2));
+ assertFalse(rs.next());
+ }
+
upsert = "UPSERT INTO " + tableName + "(i,j) select i, j+1 from " + tableName;
- upsertStmt = conn.prepareStatement(upsert);
- rowsInserted = upsertStmt.executeUpdate();
- assertEquals(1, rowsInserted);
- conn.close();
-
- conn = DriverManager.getConnection(getUrl(), props);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true); // Force to run on server side.
+ try (PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+ rowsInserted = upsertStmt.executeUpdate();
+ assertEquals(1, rowsInserted);
+ }
+ }
+
select = "SELECT j FROM " + tableName;
- rs = conn.createStatement().executeQuery(select);
- assertTrue(rs.next());
- assertEquals(2,rs.getInt(1));
- assertFalse(rs.next());
- conn.close();
-
- conn = DriverManager.getConnection(getUrl(), props);
- conn.setAutoCommit(true); // Force to run on server side.
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ rs = stmt.executeQuery(select);
+ assertTrue(rs.next());
+ assertEquals(2,rs.getInt(1));
+ assertFalse(rs.next());
+ }
+
upsert = "UPSERT INTO " + tableName + "(i,j) select i, i from " + tableName;
- upsertStmt = conn.prepareStatement(upsert);
- rowsInserted = upsertStmt.executeUpdate();
- assertEquals(1, rowsInserted);
- conn.close();
-
- conn = DriverManager.getConnection(getUrl(), props);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true); // Force to run on server side.
+ try (PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+ rowsInserted = upsertStmt.executeUpdate();
+ assertEquals(1, rowsInserted);
+ }
+ }
+
select = "SELECT j FROM " + tableName;
- rs = conn.createStatement().executeQuery(select);
- assertTrue(rs.next());
- assertEquals(1,rs.getInt(1));
- assertFalse(rs.next());
- conn.close();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ rs = stmt.executeQuery(select);
+ assertTrue(rs.next());
+ assertEquals(1,rs.getInt(1));
+ assertFalse(rs.next());
+ }
}
@Test
public void testUpsertSelectOnDescToAsc() throws Exception {
- byte[][] splits = new byte[][] { PInteger.INSTANCE.toBytes(1), PInteger.INSTANCE.toBytes(2),
- PInteger.INSTANCE.toBytes(3), PInteger.INSTANCE.toBytes(4)};
+ byte[][] splits = new byte[][] { PInteger.INSTANCE.toBytes(1),
+ PInteger.INSTANCE.toBytes(2), PInteger.INSTANCE.toBytes(3),
+ PInteger.INSTANCE.toBytes(4)};
String tableName = generateUniqueName();
- createTestTable(getUrl(), "create table " + tableName + " (i integer not null primary key desc, j integer)", splits, null);
+ createTestTable(getUrl(), "create table " + tableName +
+ " (i integer not null primary key desc, j integer)", splits, null);
Properties props = new Properties();
props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
allowServerSideMutations);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ ResultSet rs;
+ int rowsInserted;
String upsert = "UPSERT INTO " + tableName + " VALUES(1, 1)";
- PreparedStatement upsertStmt = conn.prepareStatement(upsert);
- int rowsInserted = upsertStmt.executeUpdate();
- assertEquals(1, rowsInserted);
- conn.commit();
- conn.close();
-
- conn = DriverManager.getConnection(getUrl(), props);
- conn.setAutoCommit(true); // Force to run on server side.
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+ rowsInserted = upsertStmt.executeUpdate();
+ assertEquals(1, rowsInserted);
+ conn.commit();
+ }
+
upsert = "UPSERT INTO " + tableName + " (i,j) select i+1, j+1 from " + tableName;
- upsertStmt = conn.prepareStatement(upsert);
- rowsInserted = upsertStmt.executeUpdate();
- assertEquals(1, rowsInserted);
- conn.commit();
- conn.close();
-
- conn = DriverManager.getConnection(getUrl(), props);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true); // Force to run on server side.
+ try (PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+ rowsInserted = upsertStmt.executeUpdate();
+ assertEquals(1, rowsInserted);
+ }
+ conn.commit();
+ }
+
String select = "SELECT i,j FROM " + tableName;
- ResultSet rs = conn.createStatement().executeQuery(select);
- assertTrue(rs.next());
- assertEquals(2,rs.getInt(1));
- assertEquals(2,rs.getInt(2));
- assertTrue(rs.next());
- assertEquals(1,rs.getInt(1));
- assertEquals(1,rs.getInt(2));
- assertFalse(rs.next());
- conn.close();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ rs = stmt.executeQuery(select);
+ assertTrue(rs.next());
+ assertEquals(2,rs.getInt(1));
+ assertEquals(2,rs.getInt(2));
+ assertTrue(rs.next());
+ assertEquals(1,rs.getInt(1));
+ assertEquals(1,rs.getInt(2));
+ assertFalse(rs.next());
+ }
}
@Test
public void testUpsertSelectRowKeyMutationOnSplitedTable() throws Exception {
- byte[][] splits = new byte[][] { PInteger.INSTANCE.toBytes(1), PInteger.INSTANCE.toBytes(2),
- PInteger.INSTANCE.toBytes(3), PInteger.INSTANCE.toBytes(4)};
+ byte[][] splits = new byte[][] { PInteger.INSTANCE.toBytes(1),
+ PInteger.INSTANCE.toBytes(2), PInteger.INSTANCE.toBytes(3),
+ PInteger.INSTANCE.toBytes(4)};
String tableName = generateUniqueName();
ensureTableCreated(getUrl(), tableName, "IntKeyTest", splits, null, null);
Properties props = new Properties();
props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
allowServerSideMutations);
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ int rowsInserted;
+ ResultSet rs;
+
String upsert = "UPSERT INTO " + tableName + " VALUES(?)";
- PreparedStatement upsertStmt = conn.prepareStatement(upsert);
- upsertStmt.setInt(1, 1);
- upsertStmt.executeUpdate();
- upsertStmt.setInt(1, 3);
- upsertStmt.executeUpdate();
- conn.commit();
- conn.close();
-
- conn = DriverManager.getConnection(getUrl(), props);
- // Normally this would force a server side update. But since this changes the PK column, it would
- // for to run on the client side.
- conn.setAutoCommit(true);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+ upsertStmt.setInt(1, 1);
+ upsertStmt.executeUpdate();
+ upsertStmt.setInt(1, 3);
+ upsertStmt.executeUpdate();
+ conn.commit();
+ }
+
upsert = "UPSERT INTO " + tableName + " (i) SELECT i+1 from " + tableName;
- upsertStmt = conn.prepareStatement(upsert);
- int rowsInserted = upsertStmt.executeUpdate();
- assertEquals(2, rowsInserted);
- conn.commit();
- conn.close();
-
- conn = DriverManager.getConnection(getUrl(), props);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ // Normally this would force a server side update. But since this changes the
+ // PK column, it would for to run on the client side.
+ conn.setAutoCommit(true);
+ try (PreparedStatement upsertStmt = conn.prepareStatement(upsert)) {
+ rowsInserted = upsertStmt.executeUpdate();
+ assertEquals(2, rowsInserted);
+ }
+ conn.commit();
+ }
+
String select = "SELECT i FROM " + tableName;
- ResultSet rs = conn.createStatement().executeQuery(select);
- assertTrue(rs.next());
- assertEquals(1,rs.getInt(1));
- assertTrue(rs.next());
- assertTrue(rs.next());
- assertTrue(rs.next());
- assertEquals(4,rs.getInt(1));
- assertFalse(rs.next());
- conn.close();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ rs = stmt.executeQuery(select);
+ assertTrue(rs.next());
+ assertEquals(1,rs.getInt(1));
+ assertTrue(rs.next());
+ assertTrue(rs.next());
+ assertTrue(rs.next());
+ assertEquals(4,rs.getInt(1));
+ assertFalse(rs.next());
+ }
}
@Test
@@ -670,64 +747,75 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
Properties props = new Properties();
props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
allowServerSideMutations);
- Connection conn = DriverManager.getConnection(getUrl(), props);
String tableName = generateUniqueName();
- conn.createStatement().execute("create table " + tableName + " (id varchar(10) not null primary key, val varchar(10), ts timestamp)");
- conn.close();
+ ResultSet rs;
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("create table " + tableName +
+ " (id varchar(10) not null primary key, val varchar(10), ts timestamp)");
+ }
- conn = DriverManager.getConnection(getUrl(), props);
- conn.createStatement().execute("upsert into " + tableName + " values ('aaa', 'abc', current_date())");
- conn.createStatement().execute("upsert into " + tableName + " values ('bbb', 'bcd', current_date())");
- conn.createStatement().execute("upsert into " + tableName + " values ('ccc', 'cde', current_date())");
- conn.commit();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("upsert into " + tableName +
+ " values ('aaa', 'abc', current_date())");
+ stmt.execute("upsert into " + tableName +
+ " values ('bbb', 'bcd', current_date())");
+ stmt.execute("upsert into " + tableName +
+ " values ('ccc', 'cde', current_date())");
+ conn.commit();
+ }
- conn = DriverManager.getConnection(getUrl(), props);
- ResultSet rs = conn.createStatement().executeQuery("select * from " + tableName);
-
- assertTrue(rs.next());
- assertEquals("aaa",rs.getString(1));
- assertEquals("abc",rs.getString(2));
- assertNotNull(rs.getDate(3));
-
- assertTrue(rs.next());
- assertEquals("bbb",rs.getString(1));
- assertEquals("bcd",rs.getString(2));
- assertNotNull(rs.getDate(3));
-
- assertTrue(rs.next());
- assertEquals("ccc",rs.getString(1));
- assertEquals("cde",rs.getString(2));
- assertNotNull(rs.getDate(3));
-
- assertFalse(rs.next());
- conn.close();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ rs = stmt.executeQuery("select * from " + tableName);
- conn = DriverManager.getConnection(getUrl(), props);
- conn.createStatement().execute("upsert into " + tableName + " (id, ts) select id, CAST(null AS timestamp) from " + tableName + " where id <= 'bbb' limit 1");
- conn.commit();
- conn.close();
+ assertTrue(rs.next());
+ assertEquals("aaa",rs.getString(1));
+ assertEquals("abc",rs.getString(2));
+ assertNotNull(rs.getDate(3));
- conn = DriverManager.getConnection(getUrl(), props);
- rs = conn.createStatement().executeQuery("select * from " + tableName);
-
- assertTrue(rs.next());
- assertEquals("aaa",rs.getString(1));
- assertEquals("abc",rs.getString(2));
- assertNull(rs.getDate(3));
-
- assertTrue(rs.next());
- assertEquals("bbb",rs.getString(1));
- assertEquals("bcd",rs.getString(2));
- assertNotNull(rs.getDate(3));
-
- assertTrue(rs.next());
- assertEquals("ccc",rs.getString(1));
- assertEquals("cde",rs.getString(2));
- assertNotNull(rs.getDate(3));
-
- assertFalse(rs.next());
- conn.close();
+ assertTrue(rs.next());
+ assertEquals("bbb",rs.getString(1));
+ assertEquals("bcd",rs.getString(2));
+ assertNotNull(rs.getDate(3));
+
+ assertTrue(rs.next());
+ assertEquals("ccc",rs.getString(1));
+ assertEquals("cde",rs.getString(2));
+ assertNotNull(rs.getDate(3));
+
+ assertFalse(rs.next());
+ }
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("upsert into " + tableName +
+ " (id, ts) select id, CAST(null AS timestamp) from " + tableName +
+ " where id <= 'bbb' limit 1");
+ conn.commit();
+ }
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ rs = stmt.executeQuery("select * from " + tableName);
+ assertTrue(rs.next());
+ assertEquals("aaa",rs.getString(1));
+ assertEquals("abc",rs.getString(2));
+ assertNull(rs.getDate(3));
+
+ assertTrue(rs.next());
+ assertEquals("bbb",rs.getString(1));
+ assertEquals("bcd",rs.getString(2));
+ assertNotNull(rs.getDate(3));
+
+ assertTrue(rs.next());
+ assertEquals("ccc",rs.getString(1));
+ assertEquals("cde",rs.getString(2));
+ assertNotNull(rs.getDate(3));
+
+ assertFalse(rs.next());
+ }
}
@Test
@@ -735,42 +823,51 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
Properties props = new Properties();
props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
allowServerSideMutations);
- Connection conn = DriverManager.getConnection(getUrl(), props);
String t1 = generateUniqueName();
String t2 = generateUniqueName();
String seq = generateUniqueName();
- conn.createStatement().execute("create table " + t1 + " (id bigint not null primary key, v varchar)");
- conn.createStatement().execute("create table " + t2 + " (k varchar primary key)");
- conn.createStatement().execute("create sequence " + seq);
- conn.close();
-
- conn = DriverManager.getConnection(getUrl(), props);
- conn.createStatement().execute("upsert into " + t2 + " values ('a')");
- conn.createStatement().execute("upsert into " + t2 + " values ('b')");
- conn.createStatement().execute("upsert into " + t2 + " values ('c')");
- conn.commit();
-
- conn = DriverManager.getConnection(getUrl(), props);
- conn.createStatement().execute("upsert into " + t1 + " select next value for " + seq + " , k from " + t2);
- conn.commit();
-
- conn = DriverManager.getConnection(getUrl(), props);
- ResultSet rs = conn.createStatement().executeQuery("select * from " + t1);
-
- assertTrue(rs.next());
- assertEquals(1,rs.getLong(1));
- assertEquals("a",rs.getString(2));
-
- assertTrue(rs.next());
- assertEquals(2,rs.getLong(1));
- assertEquals("b",rs.getString(2));
-
- assertTrue(rs.next());
- assertEquals(3,rs.getLong(1));
- assertEquals("c",rs.getString(2));
-
- assertFalse(rs.next());
- conn.close();
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("create table " + t1 +
+ " (id bigint not null primary key, v varchar)");
+ stmt.execute("create table " + t2 + " (k varchar primary key)");
+ stmt.execute("create sequence " + seq);
+ }
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("upsert into " + t2 + " values ('a')");
+ stmt.execute("upsert into " + t2 + " values ('b')");
+ stmt.execute("upsert into " + t2 + " values ('c')");
+ conn.commit();
+ }
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("upsert into " + t1 + " select next value for " +
+ seq + " , k from " + t2);
+ conn.commit();
+ }
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ ResultSet rs = stmt.executeQuery("select * from " + t1);
+
+ assertTrue(rs.next());
+ assertEquals(1,rs.getLong(1));
+ assertEquals("a",rs.getString(2));
+
+ assertTrue(rs.next());
+ assertEquals(2,rs.getLong(1));
+ assertEquals("b",rs.getString(2));
+
+ assertTrue(rs.next());
+ assertEquals(3,rs.getLong(1));
+ assertEquals("c",rs.getString(2));
+
+ assertFalse(rs.next());
+ }
}
@Test
@@ -779,53 +876,61 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
allowServerSideMutations);
- Connection conn = DriverManager.getConnection(getUrl(), props);
String t1 = generateUniqueName();
String t2 = generateUniqueName();
- String ddl = "CREATE TABLE IF NOT EXISTS " + t1 + "("
- + "ORGANIZATION_ID CHAR(15) NOT NULL, QUERY_ID CHAR(15) NOT NULL, CURSOR_ORDER BIGINT NOT NULL, K1 INTEGER, V1 INTEGER "
- + "CONSTRAINT MAIN_PK PRIMARY KEY (ORGANIZATION_ID, QUERY_ID, CURSOR_ORDER) " + ") SALT_BUCKETS = 4";
- conn.createStatement().execute(ddl);
- conn.createStatement().execute(
- "CREATE TABLE " + t2
- + "(ORGANIZATION_ID CHAR(15) NOT NULL, k1 integer NOT NULL, v1 integer NOT NULL "
- + "CONSTRAINT PK PRIMARY KEY (ORGANIZATION_ID, k1, v1) ) VERSIONS=1, SALT_BUCKETS = 4");
- conn.createStatement().execute("create sequence s cache " + Integer.MAX_VALUE);
- conn.close();
-
- conn = DriverManager.getConnection(getUrl(), props);
- for (int i = 0; i < numOfRecords; i++) {
- conn.createStatement().execute(
- "UPSERT INTO " + t2 + " values ('00Dxx0000001gEH'," + i + "," + (i + 2) + ")");
- }
- conn.commit();
- conn.close();
-
- conn = DriverManager.getConnection(getUrl(), props);
- conn.setAutoCommit(true);
- conn.createStatement().execute(
- "UPSERT INTO " + t1 + " SELECT '00Dxx0000001gEH', 'MyQueryId', NEXT VALUE FOR S, k1, v1 FROM " + t2 + " ORDER BY K1, V1");
-
- conn = DriverManager.getConnection(getUrl(), props);
- ResultSet rs = conn.createStatement().executeQuery("select count(*) from " + t1);
-
- assertTrue(rs.next());
- assertEquals(numOfRecords, rs.getLong(1));
-
- ResultSet rs2 = conn.createStatement().executeQuery(
- "select cursor_order, k1, v1 from " + t1 + " order by cursor_order");
- long seq = 1;
- while (rs2.next()) {
- assertEquals(seq, rs2.getLong("cursor_order"));
- // This value should be the sequence - 1 as we said order by k1 in the UPSERT...SELECT, but is not because
- // of sequence processing.
- assertEquals(seq - 1, rs2.getLong("k1"));
- seq++;
- }
- // cleanup afrer ourselves
- conn.createStatement().execute("drop sequence s");
- conn.close();
+ String ddl = "CREATE TABLE IF NOT EXISTS " + t1 + "(ORGANIZATION_ID CHAR(15) NOT NULL, " +
+ "QUERY_ID CHAR(15) NOT NULL, CURSOR_ORDER BIGINT NOT NULL, K1 INTEGER, " +
+ "V1 INTEGER " + "CONSTRAINT MAIN_PK PRIMARY KEY (ORGANIZATION_ID, QUERY_ID, " +
+ "CURSOR_ORDER) " + ") SALT_BUCKETS = 4";
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute(ddl);
+ stmt.execute(
+ "CREATE TABLE " + t2 + "(ORGANIZATION_ID CHAR(15) NOT NULL, k1 integer " +
+ "NOT NULL, v1 integer NOT NULL CONSTRAINT PK PRIMARY KEY " +
+ "(ORGANIZATION_ID, k1, v1) ) VERSIONS=1, SALT_BUCKETS = 4");
+ stmt.execute("create sequence s cache " + Integer.MAX_VALUE);
+ }
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ for (int i = 0; i < numOfRecords; i++) {
+ stmt.execute("UPSERT INTO " + t2 +
+ " values ('00Dxx0000001gEH'," + i + "," + (i + 2) + ")");
+ }
+ conn.commit();
+ }
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ conn.setAutoCommit(true);
+ stmt.execute("UPSERT INTO " + t1 +
+ " SELECT '00Dxx0000001gEH', 'MyQueryId', NEXT VALUE FOR S, k1, v1 FROM " +
+ t2 + " ORDER BY K1, V1");
+ }
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ ResultSet rs = stmt.executeQuery("select count(*) from " + t1);
+
+ assertTrue(rs.next());
+ assertEquals(numOfRecords, rs.getLong(1));
+
+ ResultSet rs2 = stmt.executeQuery("select cursor_order, k1, v1 from " + t1 +
+ " order by cursor_order");
+ long seq = 1;
+ while (rs2.next()) {
+ assertEquals(seq, rs2.getLong("cursor_order"));
+ // This value should be the sequence - 1 as we said order by k1 in the
+ // UPSERT...SELECT, but is not because of sequence processing.
+ assertEquals(seq - 1, rs2.getLong("k1"));
+ seq++;
+ }
+ // cleanup afrer ourselves
+ stmt.execute("drop sequence s");
+ }
}
@Test
@@ -833,10 +938,17 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
String t1 = generateUniqueName();
String t2 = generateUniqueName();
String t3 = generateUniqueName();
- try (Connection conn = DriverManager.getConnection(getUrl())) {
- conn.createStatement().execute("CREATE TABLE " + t1 + " (PK1 VARCHAR NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 DESC ROW_TIMESTAMP " + ")) ");
- conn.createStatement().execute("CREATE TABLE " + t2 + " (PK1 VARCHAR NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 ROW_TIMESTAMP)) ");
- conn.createStatement().execute("CREATE TABLE " + t3 + " (PK1 VARCHAR NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 DESC ROW_TIMESTAMP " + ")) ");
+ try (Connection conn = DriverManager.getConnection(getUrl());
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("CREATE TABLE " + t1 +
+ " (PK1 VARCHAR NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR CONSTRAINT PK " +
+ "PRIMARY KEY(PK1, PK2 DESC ROW_TIMESTAMP " + ")) ");
+ stmt.execute("CREATE TABLE " + t2 +
+ " (PK1 VARCHAR NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR CONSTRAINT PK " +
+ "PRIMARY KEY(PK1, PK2 ROW_TIMESTAMP)) ");
+ stmt.execute("CREATE TABLE " + t3 + " (PK1 VARCHAR NOT NULL, " +
+ "PK2 DATE NOT NULL, KV1 VARCHAR CONSTRAINT PK " +
+ "PRIMARY KEY(PK1, PK2 DESC ROW_TIMESTAMP " + ")) ");
}
// The timestamp of the put will be the value of the row_timestamp column.
@@ -845,8 +957,9 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
Properties props = new Properties();
props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
allowServerSideMutations);
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + t1 + " (PK1, PK2, KV1) VALUES(?, ?, ?)");
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + t1 +
+ " (PK1, PK2, KV1) VALUES(?, ?, ?)")) {
stmt.setString(1, "PK1");
stmt.setDate(2, rowTimestampDate);
stmt.setString(3, "KV1");
@@ -854,53 +967,66 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
conn.commit();
}
- // Upsert select data into table T2. The connection needs to be at a timestamp beyond the row timestamp. Otherwise
- // it won't see the data from table T1.
- try (Connection conn = DriverManager.getConnection(getUrl())) {
- conn.createStatement().executeUpdate("UPSERT INTO " + t2 + " SELECT * FROM " + t1);
+ // Upsert select data into table T2. The connection needs to be at a timestamp beyond the
+ // row timestamp. Otherwise it won't see the data from table T1.
+ try (Connection conn = DriverManager.getConnection(getUrl());
+ Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate("UPSERT INTO " + t2 + " SELECT * FROM " + t1);
conn.commit();
- // Verify the data upserted in T2. Note that we can use the same connection here because the data was
- // inserted with a timestamp of rowTimestamp and the connection uses the latest timestamp
- PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + t2 + " WHERE PK1 = ? AND PK2 = ?");
- stmt.setString(1, "PK1");
- stmt.setDate(2, rowTimestampDate);
- ResultSet rs = stmt.executeQuery();
- assertTrue(rs.next());
- assertEquals("PK1", rs.getString("PK1"));
- assertEquals(rowTimestampDate, rs.getDate("PK2"));
- assertEquals("KV1", rs.getString("KV1"));
+ // Verify the data upserted in T2. Note that we can use the same connection here because
+ // the data was inserted with a timestamp of rowTimestamp and the connection uses
+ // the latest timestamp
+ try (PreparedStatement prepStmt = conn.prepareStatement("SELECT * FROM " + t2 +
+ " WHERE PK1 = ? AND PK2 = ?")) {
+ prepStmt.setString(1, "PK1");
+ prepStmt.setDate(2, rowTimestampDate);
+ ResultSet rs = prepStmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals("PK1", rs.getString("PK1"));
+ assertEquals(rowTimestampDate, rs.getDate("PK2"));
+ assertEquals("KV1", rs.getString("KV1"));
+ }
+
}
- // Verify that you can't see the data in T2 if the connection is at a timestamp lower than the row timestamp.
+ // Verify that you can't see the data in T2 if the connection is at a timestamp
+ // lower than the row timestamp.
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(rowTimestamp-1));
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + t2 + " WHERE PK1 = ? AND PK2 = ?");
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + t2 +
+ " WHERE PK1 = ? AND PK2 = ?")) {
stmt.setString(1, "PK1");
stmt.setDate(2, rowTimestampDate);
ResultSet rs = stmt.executeQuery();
assertFalse(rs.next());
}
- // Upsert select data into table T3. The connection needs to be at a timestamp beyond the row timestamp. Otherwise
- // it won't see the data from table T1.
- try (Connection conn = DriverManager.getConnection(getUrl())) {
- conn.createStatement().executeUpdate("UPSERT INTO " + t3 + " SELECT * FROM " + t1);
+ // Upsert select data into table T3. The connection needs to be at a timestamp beyond the
+ // row timestamp. Otherwise it won't see the data from table T1.
+ try (Connection conn = DriverManager.getConnection(getUrl());
+ Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate("UPSERT INTO " + t3 + " SELECT * FROM " + t1);
conn.commit();
- // Verify the data upserted in T3. Note that we can use the same connection here because the data was
- // inserted with a timestamp of rowTimestamp and the connection uses the latest timestamp
- PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + t3 + " WHERE PK1 = ? AND PK2 = ?");
- stmt.setString(1, "PK1");
- stmt.setDate(2, rowTimestampDate);
- ResultSet rs = stmt.executeQuery();
- assertTrue(rs.next());
- assertEquals("PK1", rs.getString("PK1"));
- assertEquals(rowTimestampDate, rs.getDate("PK2"));
- assertEquals("KV1", rs.getString("KV1"));
+ // Verify the data upserted in T3. Note that we can use the same connection here
+ // because the data was inserted with a timestamp of rowTimestamp and the connection
+ // uses the latest timestamp
+ try (PreparedStatement prepStmt = conn.prepareStatement("SELECT * FROM " + t3 +
+ " WHERE PK1 = ? AND PK2 = ?")) {
+ prepStmt.setString(1, "PK1");
+ prepStmt.setDate(2, rowTimestampDate);
+ ResultSet rs = prepStmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals("PK1", rs.getString("PK1"));
+ assertEquals(rowTimestampDate, rs.getDate("PK2"));
+ assertEquals("KV1", rs.getString("KV1"));
+ }
}
- // Verify that you can't see the data in T2 if the connection is at next timestamp (which is lower than the row timestamp).
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + t3 + " WHERE PK1 = ? AND PK2 = ?");
+ // Verify that you can't see the data in T2 if the connection is at next timestamp
+ // (which is lower than the row timestamp).
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + t3 +
+ " WHERE PK1 = ? AND PK2 = ?")) {
stmt.setString(1, "PK1");
stmt.setDate(2, rowTimestampDate);
ResultSet rs = stmt.executeQuery();
@@ -914,15 +1040,19 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
Properties props = new Properties();
props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
allowServerSideMutations);
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- conn.createStatement().execute("CREATE TABLE " + tableName + " (PK1 INTEGER NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 ROW_TIMESTAMP)) ");
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("CREATE TABLE " + tableName +
+ " (PK1 INTEGER NOT NULL, PK2 DATE NOT NULL, KV1 VARCHAR CONSTRAINT PK " +
+ "PRIMARY KEY(PK1, PK2 ROW_TIMESTAMP)) ");
}
// The timestamp of the put will be the value of the row_timestamp column.
long rowTimestamp = 100;
Date rowTimestampDate = new Date(rowTimestamp);
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (PK1, PK2, KV1) VALUES(?, ?, ?)");
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName +
+ " (PK1, PK2, KV1) VALUES(?, ?, ?)")) {
stmt.setInt(1, 1);
stmt.setDate(2, rowTimestampDate);
stmt.setString(3, "KV1");
@@ -930,21 +1060,26 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
conn.commit();
}
String seq = generateUniqueName();
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- conn.createStatement().execute("CREATE SEQUENCE " + seq);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("CREATE SEQUENCE " + seq);
}
- // Upsert select data into table. The connection needs to be at a timestamp beyond the row timestamp. Otherwise
- // it won't see the data from table.
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- conn.createStatement().executeUpdate("UPSERT INTO " + tableName + " SELECT NEXT VALUE FOR " + seq + ", PK2 FROM " + tableName);
+ // Upsert select data into table. The connection needs to be at a timestamp beyond the
+ // row timestamp. Otherwise it won't see the data from table.
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ stmt.executeUpdate("UPSERT INTO " + tableName +
+ " SELECT NEXT VALUE FOR " + seq + ", PK2 FROM " + tableName);
conn.commit();
}
// Upsert select using sequences.
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
conn.setAutoCommit(true);
for (int i = 0; i < 10; i++) {
- int count = conn.createStatement().executeUpdate("UPSERT INTO " + tableName + " SELECT NEXT VALUE FOR " + seq + ", PK2 FROM " + tableName);
+ int count = stmt.executeUpdate("UPSERT INTO " + tableName +
+ " SELECT NEXT VALUE FOR " + seq + ", PK2 FROM " + tableName);
assertEquals((int)Math.pow(2, i), count);
}
}
@@ -955,19 +1090,27 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
String table1 = generateUniqueName();
String table2 = generateUniqueName();
String table3 = generateUniqueName();
- try (Connection conn = DriverManager.getConnection(getUrl())) {
- conn.createStatement().execute("CREATE TABLE " + table1 + " (T1PK1 VARCHAR NOT NULL, T1PK2 DATE NOT NULL, T1KV1 VARCHAR, T1KV2 VARCHAR CONSTRAINT PK PRIMARY KEY(T1PK1, T1PK2 DESC ROW_TIMESTAMP)) ");
- conn.createStatement().execute("CREATE TABLE " + table2 + " (T2PK1 VARCHAR NOT NULL, T2PK2 DATE NOT NULL, T2KV1 VARCHAR, T2KV2 VARCHAR CONSTRAINT PK PRIMARY KEY(T2PK1, T2PK2 ROW_TIMESTAMP)) ");
- conn.createStatement().execute("CREATE TABLE " + table3 + " (T3PK1 VARCHAR NOT NULL, T3PK2 DATE NOT NULL, T3KV1 VARCHAR, T3KV2 VARCHAR CONSTRAINT PK PRIMARY KEY(T3PK1, T3PK2 DESC ROW_TIMESTAMP)) ");
+ try (Connection conn = DriverManager.getConnection(getUrl());
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("CREATE TABLE " + table1 +
+ " (T1PK1 VARCHAR NOT NULL, T1PK2 DATE NOT NULL, T1KV1 VARCHAR, T1KV2 VARCHAR " +
+ "CONSTRAINT PK PRIMARY KEY(T1PK1, T1PK2 DESC ROW_TIMESTAMP)) ");
+ stmt.execute("CREATE TABLE " + table2 +
+ " (T2PK1 VARCHAR NOT NULL, T2PK2 DATE NOT NULL, T2KV1 VARCHAR, T2KV2 VARCHAR" +
+ " CONSTRAINT PK PRIMARY KEY(T2PK1, T2PK2 ROW_TIMESTAMP)) ");
+ stmt.execute("CREATE TABLE " + table3 +
+ " (T3PK1 VARCHAR NOT NULL, T3PK2 DATE NOT NULL, T3KV1 VARCHAR, T3KV2 VARCHAR" +
+ " CONSTRAINT PK PRIMARY KEY(T3PK1, T3PK2 DESC ROW_TIMESTAMP)) ");
}
long startTime = EnvironmentEdgeManager.currentTimeMillis();
Properties props = new Properties();
props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
allowServerSideMutations);
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- // Upsert values where row_timestamp column PK2 is not set and the column names are specified
- // This should upsert data with the value for PK2 as server timestamp
- PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + table1 + " (T1PK1, T1KV1, T1KV2) VALUES (?, ?, ?)");
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + table1 +
+ " (T1PK1, T1KV1, T1KV2) VALUES (?, ?, ?)")) {
+ // Upsert values where row_timestamp column PK2 is not set and the column names
+ // are specified. This should upsert data with the value for PK2 as server timestamp
stmt.setString(1, "PK1");
stmt.setString(2, "KV1");
stmt.setString(3, "KV2");
@@ -976,10 +1119,11 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
}
long endTime = EnvironmentEdgeManager.currentTimeMillis();
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- // Now query for data that was upserted above. If the row key was generated correctly then we should be able to see
- // the data in this query.
- PreparedStatement stmt = conn.prepareStatement("SELECT T1KV1, T1KV2 FROM " + table1 + " WHERE T1PK1 = ? AND T1PK2 >= ? AND T1PK2 <= ?");
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("SELECT T1KV1, T1KV2 FROM " +
+ table1 + " WHERE T1PK1 = ? AND T1PK2 >= ? AND T1PK2 <= ?")) {
+ // Now query for data that was upserted above. If the row key was generated correctly
+ // then we should be able to see the data in this query.
stmt.setString(1, "PK1");
stmt.setDate(2, new Date(startTime));
stmt.setDate(3, new Date(endTime));
@@ -991,18 +1135,21 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
}
startTime = EnvironmentEdgeManager.currentTimeMillis();
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- // Upsert select into table2 by not selecting the row timestamp column. In this case, the rowtimestamp column would end up being set to the server timestamp
- PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + table2 + " (T2PK1, T2KV1, T2KV2) SELECT T1PK1, T1KV1, T1KV2 FROM " + table1);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + table2 +
+ " (T2PK1, T2KV1, T2KV2) SELECT T1PK1, T1KV1, T1KV2 FROM " + table1)) {
+ // Upsert select into table2 by not selecting the row timestamp column. In this case,
+ // the rowtimestamp column would end up being set to the server timestamp
stmt.executeUpdate();
conn.commit();
}
endTime = EnvironmentEdgeManager.currentTimeMillis();
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- // Now query for data that was upserted above. If the row key was generated correctly then we should be able to see
- // the data in this query.
- PreparedStatement stmt = conn.prepareStatement("SELECT T2KV1, T2KV2 FROM " + table2 + " WHERE T2PK1 = ? AND T2PK2 >= ? AND T2PK2 <= ?");
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("SELECT T2KV1, T2KV2 FROM " +
+ table2 + " WHERE T2PK1 = ? AND T2PK2 >= ? AND T2PK2 <= ?")) {
+ // Now query for data that was upserted above. If the row key was generated correctly
+ // then we should be able to see the data in this query.
stmt.setString(1, "PK1");
stmt.setDate(2, new Date(startTime));
stmt.setDate(3, new Date(endTime));
@@ -1014,18 +1161,21 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
}
startTime = EnvironmentEdgeManager.currentTimeMillis();
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- // Upsert select into table3 by not selecting the row timestamp column. In this case, the rowtimestamp column would end up being set to the server timestamp
- PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + table3 + " (T3PK1, T3KV1, T3KV2) SELECT T2PK1, T2KV1, T2KV2 FROM " + table2);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + table3 +
+ " (T3PK1, T3KV1, T3KV2) SELECT T2PK1, T2KV1, T2KV2 FROM " + table2)) {
+ // Upsert select into table3 by not selecting the row timestamp column. In this case,
+ // the rowtimestamp column would end up being set to the server timestamp
stmt.executeUpdate();
conn.commit();
}
endTime = EnvironmentEdgeManager.currentTimeMillis();
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- // Now query for data that was upserted above. If the row key was generated correctly then we should be able to see
- // the data in this query.
- PreparedStatement stmt = conn.prepareStatement("SELECT T3KV1, T3KV2 FROM " + table3 + " WHERE T3PK1 = ? AND T3PK2 >= ? AND T3PK2 <= ?");
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("SELECT T3KV1, T3KV2 FROM " +
+ table3 + " WHERE T3PK1 = ? AND T3PK2 >= ? AND T3PK2 <= ?")) {
+ // Now query for data that was upserted above. If the row key was generated correctly
+ // then we should be able to see the data in this query.
stmt.setString(1, "PK1");
stmt.setDate(2, new Date(startTime));
stmt.setDate(3, new Date(endTime));
@@ -1044,9 +1194,14 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
Properties props = new Properties();
props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
allowServerSideMutations);
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- conn.createStatement().execute("CREATE TABLE " + tableName1 + " (PK1 INTEGER NOT NULL, PK2 DATE NOT NULL, PK3 INTEGER NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 ROW_TIMESTAMP, PK3)) ");
- conn.createStatement().execute("CREATE TABLE " + tableName2 + " (PK1 INTEGER NOT NULL, PK2 DATE NOT NULL, PK3 INTEGER NOT NULL, KV1 VARCHAR CONSTRAINT PK PRIMARY KEY(PK1, PK2 DESC ROW_TIMESTAMP, PK3)) ");
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("CREATE TABLE " + tableName1 +
+ " (PK1 INTEGER NOT NULL, PK2 DATE NOT NULL, PK3 INTEGER NOT NULL, KV1 VARCHAR" +
+ " CONSTRAINT PK PRIMARY KEY(PK1, PK2 ROW_TIMESTAMP, PK3)) ");
+ stmt.execute("CREATE TABLE " + tableName2 +
+ " (PK1 INTEGER NOT NULL, PK2 DATE NOT NULL, PK3 INTEGER NOT NULL, KV1 VARCHAR" +
+ " CONSTRAINT PK PRIMARY KEY(PK1, PK2 DESC ROW_TIMESTAMP, PK3)) ");
}
String[] tableNames = {tableName1, tableName2};
@@ -1054,8 +1209,9 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
// Upsert data with the row timestamp value set
long rowTimestamp1 = 100;
Date rowTimestampDate = new Date(rowTimestamp1);
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " (PK1, PK2, PK3, KV1) VALUES(?, ?, ?, ?)");
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " +
+ tableName + " (PK1, PK2, PK3, KV1) VALUES(?, ?, ?, ?)")) {
stmt.setInt(1, 1);
stmt.setDate(2, rowTimestampDate);
stmt.setInt(3, 3);
@@ -1065,18 +1221,21 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
}
long startTime = EnvironmentEdgeManager.currentTimeMillis();
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
conn.setAutoCommit(true);
// Upsert select in the same table with the row_timestamp column PK2 not specified.
// This will end up creating a new row whose timestamp is the server time stamp
// which is also used for the row key
- conn.createStatement().executeUpdate("UPSERT INTO " + tableName + " (PK1, PK3, KV1) SELECT PK1, PK3, KV1 FROM " + tableName);
+ stmt.executeUpdate("UPSERT INTO " + tableName +
+ " (PK1, PK3, KV1) SELECT PK1, PK3, KV1 FROM " + tableName);
}
long endTime = EnvironmentEdgeManager.currentTimeMillis();
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + tableName +
+ " WHERE PK1 = ? AND PK2 >= ? AND PK2<= ? AND PK3 = ?")) {
// Verify the row that was upserted above
- PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + tableName + " WHERE PK1 = ? AND PK2 >= ? AND PK2<= ? AND PK3 = ?");
stmt.setInt(1, 1);
stmt.setDate(2, new Date(startTime));
stmt.setDate(3, new Date(endTime));
@@ -1088,20 +1247,27 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
assertEquals("KV1", rs.getString("KV1"));
assertFalse(rs.next());
// Number of rows in the table should be 2.
- rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName);
- assertTrue(rs.next());
- assertEquals(2, rs.getInt(1));
+ try (Statement newStmt = conn.createStatement()) {
+ rs = newStmt.executeQuery("SELECT COUNT(*) FROM " + tableName);
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ }
}
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
conn.setAutoCommit(true);
- // Upsert select in the same table with the row_timestamp column PK2 specified. This will not end up creating a new row
- // because the destination pk columns, including the row timestamp column PK2, are the same as the source column.
- conn.createStatement().executeUpdate("UPSERT INTO " + tableName + " (PK1, PK2, PK3, KV1) SELECT PK1, PK2, PK3, KV1 FROM " + tableName);
+ // Upsert select in the same table with the row_timestamp column PK2 specified.
+ // This will not end up creating a new row because the destination pk columns,
+ // including the row timestamp column PK2, are the same as the source column.
+ stmt.executeUpdate("UPSERT INTO " + tableName +
+ " (PK1, PK2, PK3, KV1) SELECT PK1, PK2, PK3, KV1 FROM " + tableName);
}
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- // Verify that two rows were created. One with rowtimestamp1 and the other with rowtimestamp2
- ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ // Verify that two rows were created. One with rowtimestamp1 and the other
+ // with rowtimestamp2
+ ResultSet rs = stmt.executeQuery("SELECT COUNT(*) FROM " + tableName);
assertTrue(rs.next());
assertEquals(2, rs.getInt(1));
assertFalse(rs.next());
@@ -1109,7 +1275,7 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
}
}
-
+
@Test
public void testRowTimestampColWithViewsIndexesAndSaltedTables() throws Exception {
String baseTable = generateUniqueName();
@@ -1121,25 +1287,35 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
Properties props = new Properties();
props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
allowServerSideMutations);
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- conn.createStatement().execute("CREATE IMMUTABLE TABLE " + baseTable + " (TENANT_ID CHAR(15) NOT NULL, PK2 DATE NOT NULL, PK3 INTEGER NOT NULL, KV1 VARCHAR, KV2 VARCHAR, KV3 VARCHAR CONSTRAINT PK PRIMARY KEY(TENANT_ID, PK2 ROW_TIMESTAMP, PK3)) MULTI_TENANT = true, SALT_BUCKETS = 8");
- conn.createStatement().execute("CREATE INDEX " + baseTableIdx + " ON " + baseTable + " (PK2, KV3) INCLUDE (KV1)");
- conn.createStatement().execute("CREATE VIEW " + globalView + " AS SELECT * FROM " + baseTable + " WHERE KV1 = 'KV1'");
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("CREATE IMMUTABLE TABLE " + baseTable +
+ " (TENANT_ID CHAR(15) NOT NULL, PK2 DATE NOT NULL, PK3 INTEGER NOT NULL, " +
+ "KV1 VARCHAR, KV2 VARCHAR, KV3 VARCHAR CONSTRAINT PK PRIMARY KEY(TENANT_ID, " +
+ "PK2 ROW_TIMESTAMP, PK3)) MULTI_TENANT = true, SALT_BUCKETS = 8");
+ stmt.execute("CREATE INDEX " + baseTableIdx + " ON " +
+ baseTable + " (PK2, KV3) INCLUDE (KV1)");
+ stmt.execute("CREATE VIEW " + globalView + " AS SELECT * FROM " +
+ baseTable + " WHERE KV1 = 'KV1'");
}
String tenantId = "tenant1";
- try (Connection conn = getTenantConnection(tenantId)) {
- conn.createStatement().execute("CREATE VIEW " + tenantView + " AS SELECT * FROM " + baseTable);
- conn.createStatement().execute("CREATE INDEX " + tenantViewIdx + " ON " + tenantView + " (PK2, KV2) INCLUDE (KV1)");
+ try (Connection conn = getTenantConnection(tenantId);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("CREATE VIEW " + tenantView + " AS SELECT * FROM " +
+ baseTable);
+ stmt.execute("CREATE INDEX " + tenantViewIdx + " ON " +
+ tenantView + " (PK2, KV2) INCLUDE (KV1)");
}
// upsert data into base table without specifying the row timestamp column PK2
long startTime = EnvironmentEdgeManager.currentTimeMillis();
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- // Upsert select in the same table with the row_timestamp column PK2 not specified. This will end up
- // creating a new row whose timestamp is the latest timestamp (which will be used
- // for the row key too)
- PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + baseTable + " (TENANT_ID, PK3, KV1, KV2, KV3) VALUES (?, ?, ?, ?, ?)");
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + baseTable +
+ " (TENANT_ID, PK3, KV1, KV2, KV3) VALUES (?, ?, ?, ?, ?)")) {
+ // Upsert select in the same table with the row_timestamp column PK2 not specified.
+ // This will end up creating a new row whose timestamp is the latest timestamp
+ // (which will be used for the row key too)
stmt.setString(1, tenantId);
stmt.setInt(2, 3);
stmt.setString(3, "KV1");
@@ -1150,52 +1326,62 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
}
long endTime = EnvironmentEdgeManager.currentTimeMillis();
- // Verify that we can see data when querying through base table, global view and index on the base table
+ // Verify that we can see data when querying through base table, global view and index on
+ // the base table
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
// Query the base table
- PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + baseTable + " WHERE TENANT_ID = ? AND PK2 >= ? AND PK2 <= ? AND PK3 = ?");
- stmt.setString(1, tenantId);
- stmt.setDate(2, new Date(startTime));
- stmt.setDate(3, new Date(endTime));
- stmt.setInt(4, 3);
- ResultSet rs = stmt.executeQuery();
- assertTrue(rs.next());
- assertEquals(tenantId, rs.getString("TENANT_ID"));
- assertEquals("KV1", rs.getString("KV1"));
- assertEquals("KV2", rs.getString("KV2"));
- assertEquals("KV3", rs.getString("KV3"));
- assertFalse(rs.next());
+ try (PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + baseTable +
+ " WHERE TENANT_ID = ? AND PK2 >= ? AND PK2 <= ? AND PK3 = ?")) {
+ stmt.setString(1, tenantId);
+ stmt.setDate(2, new Date(startTime));
+ stmt.setDate(3, new Date(endTime));
+ stmt.setInt(4, 3);
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(tenantId, rs.getString("TENANT_ID"));
+ assertEquals("KV1", rs.getString("KV1"));
+ assertEquals("KV2", rs.getString("KV2"));
+ assertEquals("KV3", rs.getString("KV3"));
+ assertFalse(rs.next());
+ }
// Query the globalView
- stmt = conn.prepareStatement("SELECT /*+ NO_INDEX */ * FROM " + globalView + " WHERE TENANT_ID = ? AND PK2 >= ? AND PK2 <= ? AND PK3 = ?");
- stmt.setString(1, tenantId);
- stmt.setDate(2, new Date(startTime));
- stmt.setDate(3, new Date(endTime));
- stmt.setInt(4, 3);
- rs = stmt.executeQuery();
- assertTrue(rs.next());
- assertEquals(tenantId, rs.getString("TENANT_ID"));
- assertEquals("KV1", rs.getString("KV1"));
- assertEquals("KV2", rs.getString("KV2"));
- assertEquals("KV3", rs.getString("KV3"));
- assertFalse(rs.next());
+ try (PreparedStatement stmt = conn.prepareStatement(
+ "SELECT /*+ NO_INDEX */ * FROM " + globalView +
+ " WHERE TENANT_ID = ? AND PK2 >= ? AND PK2 <= ? AND PK3 = ?")) {
+ stmt.setString(1, tenantId);
+ stmt.setDate(2, new Date(startTime));
+ stmt.setDate(3, new Date(endTime));
+ stmt.setInt(4, 3);
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(tenantId, rs.getString("TENANT_ID"));
+ assertEquals("KV1", rs.getString("KV1"));
+ assertEquals("KV2", rs.getString("KV2"));
+ assertEquals("KV3", rs.getString("KV3"));
+ assertFalse(rs.next());
+ }
+
// Query using the index on base table
- stmt = conn.prepareStatement("SELECT KV1 FROM " + baseTable + " WHERE PK2 >= ? AND PK2 <= ? AND KV3 = ?");
- stmt.setDate(1, new Date(startTime));
- stmt.setDate(2, new Date(endTime));
- stmt.setString(3, "KV3");
- rs = stmt.executeQuery();
- QueryPlan plan = stmt.unwrap(PhoenixStatement.class).getQueryPlan();
- assertTrue(plan.getTableRef().getTable().getName().getString().equals(baseTableIdx));
- assertTrue(rs.next());
- assertEquals("KV1", rs.getString("KV1"));
- assertFalse(rs.next());
+ try (PreparedStatement stmt = conn.prepareStatement("SELECT KV1 FROM " +
+ baseTable + " WHERE PK2 >= ? AND PK2 <= ? AND KV3 = ?")) {
+ stmt.setDate(1, new Date(startTime));
+ stmt.setDate(2, new Date(endTime));
+ stmt.setString(3, "KV3");
+ ResultSet rs = stmt.executeQuery();
+ QueryPlan plan = stmt.unwrap(PhoenixStatement.class).getQueryPlan();
+ assertEquals(plan.getTableRef().getTable().getName().getString(), baseTableIdx);
+ assertTrue(rs.next());
+ assertEquals("KV1", rs.getString("KV1"));
+ assertFalse(rs.next());
+ }
}
// Verify that data can be queried using tenant view and tenant view index
- try (Connection tenantConn = getTenantConnection(tenantId)) {
+ try (Connection tenantConn = getTenantConnection(tenantId);
+ PreparedStatement stmt = tenantConn.prepareStatement("SELECT * FROM " +
+ tenantView + " WHERE PK2 >= ? AND PK2 <= ? AND PK3 = ?")) {
// Query the tenant view
- PreparedStatement stmt = tenantConn.prepareStatement("SELECT * FROM " + tenantView + " WHERE PK2 >= ? AND PK2 <= ? AND PK3 = ?");
stmt.setDate(1, new Date(startTime));
stmt.setDate(2, new Date(endTime));
stmt.setInt(3, 3);
@@ -1208,7 +1394,8 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
// Query using the index on the tenantView
//TODO: uncomment the code after PHOENIX-2277 is fixed
-// stmt = tenantConn.prepareStatement("SELECT KV1 FROM " + tenantView + " WHERE PK2 = ? AND KV2 = ?");
+// stmt = tenantConn.prepareStatement("SELECT KV1 FROM " + tenantView +
+// " WHERE PK2 = ? AND KV2 = ?");
// stmt.setDate(1, new Date(upsertedTs));
// stmt.setString(2, "KV2");
// rs = stmt.executeQuery();
@@ -1223,74 +1410,88 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
try (Connection tenantConn = getTenantConnection(tenantId)) {
// Upsert into tenant view where the row_timestamp column PK2 is not specified
startTime = EnvironmentEdgeManager.currentTimeMillis();
- PreparedStatement stmt = tenantConn.prepareStatement("UPSERT INTO " + tenantView + " (PK3, KV1, KV2, KV3) VALUES (?, ?, ?, ?)");
- stmt.setInt(1, 33);
- stmt.setString(2, "KV13");
- stmt.setString(3, "KV23");
- stmt.setString(4, "KV33");
- stmt.executeUpdate();
+ try (PreparedStatement stmt = tenantConn.prepareStatement("UPSERT INTO " +
+ tenantView + " (PK3, KV1, KV2, KV3) VALUES (?, ?, ?, ?)")) {
+ stmt.setInt(1, 33);
+ stmt.setString(2, "KV13");
+ stmt.setString(3, "KV23");
+ stmt.setString(4, "KV33");
+ stmt.executeUpdate();
+ }
tenantConn.commit();
upsertedTs = endTime = EnvironmentEdgeManager.currentTimeMillis();
+
// Upsert into tenant view where the row_timestamp column PK2 is specified
- stmt = tenantConn.prepareStatement("UPSERT INTO " + tenantView + " (PK2, PK3, KV1, KV2, KV3) VALUES (?, ?, ?, ?, ?)");
- stmt.setDate(1, new Date(upsertedTs));
- stmt.setInt(2, 44);
- stmt.setString(3, "KV14");
- stmt.setString(4, "KV24");
- stmt.setString(5, "KV34");
- stmt.executeUpdate();
+ try (PreparedStatement stmt = tenantConn.prepareStatement("UPSERT INTO " +
+ tenantView + " (PK2, PK3, KV1, KV2, KV3) VALUES (?, ?, ?, ?, ?)")) {
+ stmt.setDate(1, new Date(upsertedTs));
+ stmt.setInt(2, 44);
+ stmt.setString(3, "KV14");
+ stmt.setString(4, "KV24");
+ stmt.setString(5, "KV34");
+ stmt.executeUpdate();
+ }
tenantConn.commit();
}
- // Verify that the data upserted using the tenant view can now be queried using base table and the base table index
+ // Verify that the data upserted using the tenant view can now be queried using base table
+ // and the base table index
Date upsertedDate;
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
// Query the base table
- PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + baseTable + " WHERE TENANT_ID = ? AND PK2 >= ? AND PK2 <= ? AND PK3 = ? ");
- stmt.setString(1, tenantId);
- stmt.setDate(2, new Date(startTime));
- stmt.setDate(3, new Date(endTime));
- stmt.setInt(4, 33);
- ResultSet rs = stmt.executeQuery();
- assertTrue(rs.next());
- assertEquals(tenantId, rs.getString("TENANT_ID"));
- assertEquals("KV13", rs.getString("KV1"));
- assertEquals("KV23", rs.getString("KV2"));
- assertEquals("KV33", rs.getString("KV3"));
- upsertedDate = rs.getDate("PK2");
- assertFalse(rs.next());
-
- stmt = conn.prepareStatement("SELECT * FROM " + baseTable + " WHERE TENANT_ID = ? AND PK2 = ? AND PK3 = ? ");
- stmt.setString(1, tenantId);
- stmt.setDate(2, new Date(upsertedTs));
- stmt.setInt(3, 44);
- rs = stmt.executeQuery();
- assertTrue(rs.next());
- assertEquals(tenantId, rs.getString("TENANT_ID"));
- assertEquals("KV14", rs.getString("KV1"));
- assertEquals("KV24", rs.getString("KV2"));
- assertEquals("KV34", rs.getString("KV3"));
- assertFalse(rs.next());
+ try (PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + baseTable +
+ " WHERE TENANT_ID = ? AND PK2 >= ? AND PK2 <= ? AND PK3 = ? ")) {
+ stmt.setString(1, tenantId);
+ stmt.setDate(2, new Date(startTime));
+ stmt.setDate(3, new Date(endTime));
+ stmt.setInt(4, 33);
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(tenantId, rs.getString("TENANT_ID"));
+ assertEquals("KV13", rs.getString("KV1"));
+ assertEquals("KV23", rs.getString("KV2"));
+ assertEquals("KV33", rs.getString("KV3"));
+ upsertedDate = rs.getDate("PK2");
+ assertFalse(rs.next());
+ }
+
+ try (PreparedStatement stmt = conn.prepareStatement("SELECT * FROM " + baseTable +
+ " WHERE TENANT_ID = ? AND PK2 = ? AND PK3 = ? ")) {
+ stmt.setString(1, tenantId);
+ stmt.setDate(2, new Date(upsertedTs));
+ stmt.setInt(3, 44);
+ ResultSet rs = stmt.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(tenantId, rs.getString("TENANT_ID"));
+ assertEquals("KV14", rs.getString("KV1"));
+ assertEquals("KV24", rs.getString("KV2"));
+ assertEquals("KV34", rs.getString("KV3"));
+ assertFalse(rs.next());
+ }
+
// Query using the index on base table
- stmt = conn.prepareStatement("SELECT KV1 FROM " + baseTable + " WHERE (PK2, KV3) IN ((?, ?), (?, ?)) ORDER BY KV1");
- stmt.setDate(1, upsertedDate);
- stmt.setString(2, "KV33");
- stmt.setDate(3, new Date(upsertedTs));
- stmt.setString(4, "KV34");
- rs = stmt.executeQuery();
- QueryPlan plan = stmt.unwrap(PhoenixStatement.class).getQueryPlan();
- assertTrue(plan.getTableRef().getTable().getName().getString().equals(baseTableIdx));
- assertTrue(rs.next());
- assertEquals("KV13", rs.getString("KV1"));
- assertTrue(rs.next());
- assertEquals("KV14", rs.getString("KV1"));
- assertFalse(rs.next());
+ try (PreparedStatement stmt = conn.prepareStatement("SELECT KV1 FROM " + baseTable +
+ " WHERE (PK2, KV3) IN ((?, ?), (?, ?)) ORDER BY KV1")) {
+ stmt.setDate(1, upsertedDate);
+ stmt.setString(2, "KV33");
+ stmt.setDate(3, new Date(upsertedTs));
+ stmt.setString(4, "KV34");
+ ResultSet rs = stmt.executeQuery();
+ QueryPlan plan = stmt.unwrap(PhoenixStatement.class).getQueryPlan();
+ assertTrue(plan.getTableRef().getTable().getName().getString().equals(baseTableIdx));
+ assertTrue(rs.next());
+ assertEquals("KV13", rs.getString("KV1"));
+ assertTrue(rs.next());
+ assertEquals("KV14", rs.getString("KV1"));
+ assertFalse(rs.next());
+ }
}
// Verify that the data upserted using the tenant view can now be queried using tenant view
- try (Connection tenantConn = getTenantConnection(tenantId)) {
+ try (Connection tenantConn = getTenantConnection(tenantId);
+ PreparedStatement stmt = tenantConn.prepareStatement("SELECT * FROM " +
+ tenantView + " WHERE (PK2, PK3) IN ((?, ?), (?, ?)) ORDER BY KV1")) {
// Query the base table
- PreparedStatement stmt = tenantConn.prepareStatement("SELECT * FROM " + tenantView + " WHERE (PK2, PK3) IN ((?, ?), (?, ?)) ORDER BY KV1");
stmt.setDate(1, upsertedDate);
stmt.setInt(2, 33);
stmt.setDate(3, new Date(upsertedTs));
@@ -1304,7 +1505,8 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
//TODO: uncomment the code after PHOENIX-2277 is fixed
// // Query using the index on the tenantView
-// stmt = tenantConn.prepareStatement("SELECT KV1 FROM " + tenantView + " WHERE (PK2, KV2) IN (?, ?, ?, ?) ORDER BY KV1");
+// stmt = tenantConn.prepareStatement("SELECT KV1 FROM " + tenantView +
+// " WHERE (PK2, KV2) IN (?, ?, ?, ?) ORDER BY KV1");
// stmt.setDate(1, new Date(upsertedTs));
// stmt.setString(2, "KV23");
// stmt.setDate(3, new Date(upsertedTs));
@@ -1327,20 +1529,25 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
Properties props = new Properties();
props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
allowServerSideMutations);
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- conn.createStatement().execute("CREATE TABLE " + tableName + " (PK1 BIGINT NOT NULL PRIMARY KEY ROW_TIMESTAMP, KV1 VARCHAR)");
- conn.createStatement().execute("CREATE TABLE " + tableName2 + " (PK1 BIGINT NOT NULL PRIMARY KEY ROW_TIMESTAMP, KV1 VARCHAR)");
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("CREATE TABLE " + tableName +
+ " (PK1 BIGINT NOT NULL PRIMARY KEY ROW_TIMESTAMP, KV1 VARCHAR)");
+ stmt.execute("CREATE TABLE " + tableName2 +
+ " (PK1 BIGINT NOT NULL PRIMARY KEY ROW_TIMESTAMP, KV1 VARCHAR)");
}
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- long upsertedTs = 100;
- PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES (?, ?)");
+ long upsertedTs = 100;
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName +
+ " VALUES (?, ?)")) {
stmt.setLong(1, upsertedTs);
stmt.setString(2, "KV1");
stmt.executeUpdate();
conn.commit();
}
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName2 + " SELECT (PK1 - 500), KV1 FROM " + tableName);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName2 +
+ " SELECT (PK1 - 500), KV1 FROM " + tableName)) {
stmt.executeUpdate();
fail();
} catch (SQLException e) {
@@ -1353,49 +1560,64 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
Properties props = new Properties();
props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
allowServerSideMutations);
- Connection conn = DriverManager.getConnection(getUrl(), props);
String t1 = generateUniqueName();
- conn.createStatement().execute(
- "create table " + t1 + " (id bigint not null primary key, ca char(3)[])");
- conn.close();
+ ResultSet rs;
- conn = DriverManager.getConnection(getUrl(), props);
- conn.createStatement().execute("upsert into " + t1 + " values (1, ARRAY['aaa', 'bbb'])");
- conn.commit();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("create table " + t1 +
+ " (id bigint not null primary key, ca char(3)[])");
+ }
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("upsert into " + t1 + " values (1, ARRAY['aaa', 'bbb'])");
+ conn.commit();
+ }
- conn = DriverManager.getConnection(getUrl(), props);
- conn.createStatement().execute(
- "upsert into " + t1 + " (id, ca) select id, ARRAY['ccc', 'ddd'] from " + t1 + " WHERE id = 1");
- conn.commit();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("upsert into " + t1 + " (id, ca) select id, " +
+ "ARRAY['ccc', 'ddd'] from " + t1 + " WHERE id = 1");
+ conn.commit();
+ }
- conn = DriverManager.getConnection(getUrl(), props);
- ResultSet rs = conn.createStatement().executeQuery("select * from " + t1);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ rs = stmt.executeQuery("select * from " + t1);
+ assertTrue(rs.next());
+ assertEquals(1, rs.getLong(1));
+ assertEquals("['ccc', 'ddd']", rs.getArray(2).toString());
- assertTrue(rs.next());
- assertEquals(1, rs.getLong(1));
- assertEquals("['ccc', 'ddd']", rs.getArray(2).toString());
+ }
- conn = DriverManager.getConnection(getUrl(), props);
String t2 = generateUniqueName();
- conn.createStatement().execute(
- "create table " + t2 + " (id bigint not null primary key, ba binary(4)[])");
- conn.close();
-
- conn = DriverManager.getConnection(getUrl(), props);
- conn.createStatement().execute("upsert into " + t2 + " values (2, ARRAY[1, 27])");
- conn.commit();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("create table " + t2 +
+ " (id bigint not null primary key, ba binary(4)[])");
+ }
- conn = DriverManager.getConnection(getUrl(), props);
- conn.createStatement().execute(
- "upsert into " + t2 + " (id, ba) select id, ARRAY[54, 1024] from " + t2 + " WHERE id = 2");
- conn.commit();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("upsert into " + t2 + " values (2, ARRAY[1, 27])");
+ conn.commit();
+ }
- conn = DriverManager.getConnection(getUrl(), props);
- rs = conn.createStatement().executeQuery("select * from " + t2);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("upsert into " + t2 + " (id, ba) select id, " +
+ "ARRAY[54, 1024] from " + t2 + " WHERE id = 2");
+ conn.commit();
+ }
- assertTrue(rs.next());
- assertEquals(2, rs.getLong(1));
- assertEquals("[[128,0,0,54], [128,0,4,0]]", rs.getArray(2).toString());
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ rs = stmt.executeQuery("select * from " + t2);
+ assertTrue(rs.next());
+ assertEquals(2, rs.getLong(1));
+ assertEquals("[[128,0,0,54], [128,0,4,0]]", rs.getArray(2).toString());
+ }
}
@Test
@@ -1412,37 +1634,45 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
Properties props = new Properties();
props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
allowServerSideMutations);
- Connection conn = DriverManager.getConnection(getUrl(), props);
- conn.setAutoCommit(autoCommit);
String t1 = generateUniqueName();
- conn.createStatement().execute(
- "create table " + t1 + " (id bigint not null primary key, v varchar(20))");
- conn.close();
-
- conn = DriverManager.getConnection(getUrl(), props);
- conn.setAutoCommit(autoCommit);
- conn.createStatement().execute("upsert into " + t1 + " values (1, 'foo')");
- conn.commit();
-
- conn = DriverManager.getConnection(getUrl(), props);
- conn.setAutoCommit(autoCommit);
- conn.createStatement().execute(
- "upsert into " + t1 + " (id, v) select id, '澴粖蟤य褻酃岤豦팑薰鄩脼ժ끦碉碉碉碉碉碉' from " + t1 + " WHERE id = 1");
- conn.commit();
-
- conn = DriverManager.getConnection(getUrl(), props);
- conn.setAutoCommit(autoCommit);
- ResultSet rs = conn.createStatement().executeQuery("select * from " + t1);
-
- assertTrue(rs.next());
- assertEquals(1, rs.getLong(1));
- assertEquals("澴粖蟤य褻酃岤豦팑薰鄩脼ժ끦碉碉碉碉碉碉", rs.getString(2));
-
- conn = DriverManager.getConnection(getUrl(), props);
- conn.setAutoCommit(autoCommit);
- try {
- conn.createStatement().execute(
- "upsert into " + t1 + " (id, v) select id, '澴粖蟤य褻酃岤豦팑薰鄩脼ժ끦碉碉碉碉碉碉碉' from " + t1 + " WHERE id = 1");
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ conn.setAutoCommit(autoCommit);
+ stmt.execute("create table " + t1 +
+ " (id bigint not null primary key, v varchar(20))");
+ }
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ conn.setAutoCommit(autoCommit);
+ stmt.execute("upsert into " + t1 + " values (1, 'foo')");
+ conn.commit();
+ }
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ conn.setAutoCommit(autoCommit);
+ stmt.execute("upsert into " + t1 + " (id, v) select id, "
+ + "'澴粖蟤य褻酃岤豦팑薰鄩脼ժ끦碉碉碉碉碉碉' from " + t1 + " WHERE id = 1");
+ conn.commit();
+ }
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ conn.setAutoCommit(autoCommit);
+ ResultSet rs = stmt.executeQuery("select * from " + t1);
+
+ assertTrue(rs.next());
+ assertEquals(1, rs.getLong(1));
+ assertEquals("澴粖蟤य褻酃岤豦팑薰鄩脼ժ끦碉碉碉碉碉碉", rs.getString(2));
+ }
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ conn.setAutoCommit(autoCommit);
+ stmt.execute("upsert into " + t1 + " (id, v) select id, "
+ + "'澴粖蟤य褻酃岤豦팑薰鄩脼ժ끦碉碉碉碉碉碉碉' from " + t1 + " WHERE id = 1");
conn.commit();
fail();
} catch (SQLException e) {
@@ -1456,28 +1686,40 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
props.setProperty(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, Integer.toString(512));
props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3));
props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3));
- props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS, allowServerSideMutations);
- Connection conn = DriverManager.getConnection(getUrl(), props);
- conn.setAutoCommit(false);
+ props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
+ allowServerSideMutations);
String t1 = generateUniqueName();
String t2 = generateUniqueName();
String seq = generateUniqueName();
- conn.createStatement().execute("CREATE SEQUENCE " + seq);
- conn.createStatement().execute("CREATE TABLE " + t1 + " (pk INTEGER PRIMARY KEY, val INTEGER) SALT_BUCKETS=4");
- conn.createStatement().execute("CREATE TABLE " + t2 + " (pk INTEGER PRIMARY KEY, val INTEGER)");
- conn.close();
-
- conn = DriverManager.getConnection(getUrl(), props);
- for (int i = 0; i < 100; i++) {
- conn.createStatement().execute("UPSERT INTO " + t1 + " VALUES (NEXT VALUE FOR " + seq + ", " + (i%10) + ")");
- }
- conn.commit();
- conn.close();
- conn = DriverManager.getConnection(getUrl(), props);
- conn.setAutoCommit(true);
- int upsertCount = conn.createStatement().executeUpdate("UPSERT INTO " + t2 + " SELECT pk, val FROM " + t1);
- assertEquals(100,upsertCount);
- conn.close();
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ conn.setAutoCommit(false);
+
+ stmt.execute("CREATE SEQUENCE " + seq);
+ stmt.execute("CREATE TABLE " + t1 +
+ " (pk INTEGER PRIMARY KEY, val INTEGER) SALT_BUCKETS=4");
+ stmt.execute("CREATE TABLE " + t2 +
+ " (pk INTEGER PRIMARY KEY, val INTEGER)");
+ }
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
+ for (int i = 0; i < 100; i++) {
+ stmt.execute("UPSERT INTO " + t1 +
+ " VALUES (NEXT VALUE FOR " + seq + ", " + (i%10) + ")");
+ }
+ conn.commit();
+ }
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(true);
+ try (Statement stmt = conn.createStatement()) {
+ int upsertCount = stmt.executeUpdate("UPSERT INTO " + t2 +
+ " SELECT pk, val FROM " + t1);
+ assertEquals(100,upsertCount);
+ }
+ }
}
@Test // See https://issues.apache.org/jira/browse/PHOENIX-4265
@@ -1487,43 +1729,47 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
Properties props = new Properties();
props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
allowServerSideMutations);
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
- conn.createStatement().execute("CREATE IMMUTABLE TABLE " + tableName
- + " (k1 TIMESTAMP not null, k2 bigint not null, v bigint, constraint pk primary key (k1 row_timestamp, k2)) SALT_BUCKETS = 9");
- conn.createStatement().execute(
- "CREATE INDEX " + indexName + " ON " + tableName + " (v) INCLUDE (k2)");
- PreparedStatement stmt =
- conn.prepareStatement("UPSERT INTO " + tableName + " VALUES (?, ?, ?) ");
- stmt.setTimestamp(1, new Timestamp(1000));
- stmt.setLong(2, 2000);
- stmt.setLong(3, 1000);
- stmt.executeUpdate();
- stmt.setTimestamp(1, new Timestamp(2000));
- stmt.setLong(2, 5000);
- stmt.setLong(3, 5);
- stmt.executeUpdate();
- stmt.setTimestamp(1, new Timestamp(3000));
- stmt.setLong(2, 5000);
- stmt.setLong(3, 5);
- stmt.executeUpdate();
- stmt.setTimestamp(1, new Timestamp(4000));
- stmt.setLong(2, 5000);
- stmt.setLong(3, 5);
- stmt.executeUpdate();
- stmt.setTimestamp(1, new Timestamp(5000));
- stmt.setLong(2, 2000);
- stmt.setLong(3, 10);
- stmt.executeUpdate();
- stmt.setTimestamp(1, new Timestamp(6000));
- stmt.setLong(2, 2000);
- stmt.setLong(3, 20);
- stmt.executeUpdate();
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement statement = conn.createStatement()) {
+ statement.execute("CREATE IMMUTABLE TABLE " + tableName +
+ " (k1 TIMESTAMP not null, k2 bigint not null, v bigint, constraint pk " +
+ "primary key (k1 row_timestamp, k2)) SALT_BUCKETS = 9");
+ statement.execute("CREATE INDEX " + indexName + " ON " + tableName +
+ " (v) INCLUDE (k2)");
+ try (PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName +
+ " VALUES (?, ?, ?) ")) {
+ stmt.setTimestamp(1, new Timestamp(1000));
+ stmt.setLong(2, 2000);
+ stmt.setLong(3, 1000);
+ stmt.executeUpdate();
+ stmt.setTimestamp(1, new Timestamp(2000));
+ stmt.setLong(2, 5000);
+ stmt.setLong(3, 5);
+ stmt.executeUpdate();
+ stmt.setTimestamp(1, new Timestamp(3000));
+ stmt.setLong(2, 5000);
+ stmt.setLong(3, 5);
+ stmt.executeUpdate();
+ stmt.setTimestamp(1, new Timestamp(4000));
+ stmt.setLong(2, 5000);
+ stmt.setLong(3, 5);
+ stmt.executeUpdate();
+ stmt.setTimestamp(1, new Timestamp(5000));
+ stmt.setLong(2, 2000);
+ stmt.setLong(3, 10);
+ stmt.executeUpdate();
+ stmt.setTimestamp(1, new Timestamp(6000));
+ stmt.setLong(2, 2000);
+ stmt.setLong(3, 20);
+ stmt.executeUpdate();
+ }
conn.commit();
- ResultSet rs = conn.createStatement().executeQuery("SELECT " +
+
+ ResultSet rs = statement.executeQuery("SELECT " +
" K2 FROM " + tableName + " WHERE V = 5");
assertTrue("Index " + indexName + " should have been used",
- rs.unwrap(PhoenixResultSet.class).getStatement().getQueryPlan().getTableRef()
- .getTable().getName().getString().equals(indexName));
+ rs.unwrap(PhoenixResultSet.class).getStatement().getQueryPlan().getTableRef()
+ .getTable().getName().getString().equals(indexName));
assertTrue(rs.next());
assertEquals(5000, rs.getLong("k2"));
assertTrue(rs.next());
@@ -1531,12 +1777,12 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
assertTrue(rs.next());
assertEquals(5000, rs.getLong("k2"));
assertFalse(rs.next());
- rs =
- conn.createStatement().executeQuery("SELECT /*+ INDEX(" + tableName + " "
- + indexName + ") */ " + " K2 FROM " + tableName + " WHERE V = 5");
+
+ rs = statement.executeQuery("SELECT /*+ INDEX(" + tableName + " "
+ + indexName + ") */ " + " K2 FROM " + tableName + " WHERE V = 5");
assertTrue("Index " + indexName + " should have been used",
- rs.unwrap(PhoenixResultSet.class).getStatement().getQueryPlan().getTableRef()
- .getTable().getName().getString().equals(indexName));
+ rs.unwrap(PhoenixResultSet.class).getStatement().getQueryPlan()
+ .getTableRef().getTable().getName().getString().equals(indexName));
assertTrue(rs.next());
assertEquals(5000, rs.getLong("k2"));
assertTrue(rs.next());
@@ -1544,6 +1790,7 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
assertTrue(rs.next());
assertEquals(5000, rs.getLong("k2"));
assertFalse(rs.next());
+
}
}
@@ -1554,13 +1801,19 @@ public class UpsertSelectIT extends ParallelStatsDisabledIT {
Properties props = new Properties();
props.setProperty(QueryServices.ENABLE_SERVER_SIDE_UPSERT_MUTATIONS,
allowServerSideMutations);
- try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ try (Connection conn = DriverManager.getConnection(getUrl(), props);
+ Statement stmt = conn.createStatement()) {
conn.setAutoCommit(true);
- conn.createStatement().execute("create table " + tableName1 + "(name varchar(160) primary key, id varchar(120), address varchar(160))");
- conn.createStatement().execute("create table " + tableName2 + "(name varchar(160) primary key, id varchar(10), address varchar(10))");
- conn.createStatement().execute("upsert into " + tableName1 + " values('test','test','test')");
- conn.createStatement().execute("upsert into " + tableName2 + " select * from " + tableName1);
- ResultSet rs = conn.createStatement().executeQuery("select * from " + tableName2);
+ stmt.execute("create table " + tableName1 +
+ "(name varchar(160) primary key, id varchar(120), address varchar(160))");
+ stmt.execute("create table " + tableName2 +
+ "(name varchar(160) primary key, id varchar(10), address varchar(10))");
+ stmt.execute("upsert into " + tableName1 +
+ " values('test','test','test')");
+ stmt.execute("upsert into " + tableName2 + " select * from " +
+ tableName1);
+ ResultSet rs = stmt.executeQuery("select * from " +
+ tableName2);
assertTrue(rs.next());
assertEquals("test", rs.getString(1));
assertEquals("test", rs.getString(2));
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
index 0ce26cb..e7aae9e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/MutatingParallelIteratorFactory.java
@@ -36,11 +36,16 @@ import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.PhoenixKeyValueUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Factory class used to instantiate an iterator to handle mutations made during a parallel scan.
*/
public abstract class MutatingParallelIteratorFactory implements ParallelIteratorFactory {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(
+ MutatingParallelIteratorFactory.class);
protected final PhoenixConnection connection;
protected MutatingParallelIteratorFactory(PhoenixConnection connection) {
@@ -50,62 +55,81 @@ public abstract class MutatingParallelIteratorFactory implements ParallelIterato
/**
* Method that does the actual mutation work
*/
- abstract protected MutationState mutate(StatementContext parentContext, ResultIterator iterator, PhoenixConnection connection) throws SQLException;
+ abstract protected MutationState mutate(StatementContext parentContext, ResultIterator iterator,
+ PhoenixConnection connection) throws SQLException;
@Override
- public PeekingResultIterator newIterator(final StatementContext parentContext, ResultIterator iterator, Scan scan, String tableName, QueryPlan plan) throws SQLException {
+ public PeekingResultIterator newIterator(final StatementContext parentContext,
+ ResultIterator iterator, Scan scan, String tableName,
+ QueryPlan plan) throws SQLException {
+
final PhoenixConnection clonedConnection = new PhoenixConnection(this.connection);
-
- MutationState state = mutate(parentContext, iterator, clonedConnection);
-
- final long totalRowCount = state.getUpdateCount();
- final boolean autoFlush = connection.getAutoCommit() || plan.getTableRef().getTable().isTransactional();
- if (autoFlush) {
- clonedConnection.getMutationState().join(state);
- state = clonedConnection.getMutationState();
- }
- final MutationState finalState = state;
-
- byte[] value = PLong.INSTANCE.toBytes(totalRowCount);
- Cell keyValue = PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length);
- final Tuple tuple = new SingleKeyValueTuple(keyValue);
- return new PeekingResultIterator() {
- private boolean done = false;
-
- @Override
- public Tuple next() throws SQLException {
- if (done) {
- return null;
- }
- done = true;
- return tuple;
- }
+ try {
+ MutationState state = mutate(parentContext, iterator, clonedConnection);
- @Override
- public void explain(List<String> planSteps) {
+ final long totalRowCount = state.getUpdateCount();
+ final boolean autoFlush = connection.getAutoCommit() ||
+ plan.getTableRef().getTable().isTransactional();
+ if (autoFlush) {
+ clonedConnection.getMutationState().join(state);
+ state = clonedConnection.getMutationState();
}
+ final MutationState finalState = state;
+
+ byte[] value = PLong.INSTANCE.toBytes(totalRowCount);
+ Cell keyValue = PhoenixKeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY,
+ SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length);
+ final Tuple tuple = new SingleKeyValueTuple(keyValue);
+ return new PeekingResultIterator() {
+ private boolean done = false;
- @Override
- public void close() throws SQLException {
- try {
- /*
- * Join the child mutation states in close, since this is called in a single threaded manner
- * after the parallel results have been processed.
- * If auto-commit is on for the cloned child connection, then the finalState here is an empty mutation
- * state (with no mutations). However, it still has the metrics for mutation work done by the
- * mutating-iterator. Joining the mutation state makes sure those metrics are passed over
- * to the parent connection.
- */
- MutatingParallelIteratorFactory.this.connection.getMutationState().join(finalState);
- } finally {
- clonedConnection.close();
+ @Override
+ public Tuple next() {
+ if (done) {
+ return null;
+ }
+ done = true;
+ return tuple;
}
- }
- @Override
- public Tuple peek() throws SQLException {
- return done ? null : tuple;
+ @Override
+ public void explain(List<String> planSteps) {
+ }
+
+ @Override
+ public void close() throws SQLException {
+ try {
+ /*
+ * Join the child mutation states in close, since this is called in a single
+ * threaded manner after the parallel results have been processed.
+ * If auto-commit is on for the cloned child connection, then the finalState
+ * here is an empty mutation state (with no mutations). However, it still
+ * has the metrics for mutation work done by the mutating-iterator.
+ * Joining the mutation state makes sure those metrics are passed over
+ * to the parent connection.
+ */
+ MutatingParallelIteratorFactory.this.connection.getMutationState()
+ .join(finalState);
+ } finally {
+ clonedConnection.close();
+ }
+ }
+
+ @Override
+ public Tuple peek() {
+ return done ? null : tuple;
+ }
+ };
+ } catch (Throwable ex) {
+ // Catch just to make sure we close the cloned connection and then rethrow
+ try {
+ // closeQuietly only handles IOException
+ clonedConnection.close();
+ } catch (SQLException sqlEx) {
+ LOGGER.error("Closing cloned Phoenix connection inside iterator, failed with: ",
+ sqlEx);
}
- };
+ throw ex;
+ }
}
}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 97447f2..8e935b6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -175,8 +175,10 @@ public class UpsertCompiler {
mutation.put(ptr, new RowMutationState(columnValues, columnValueSize, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, onDupKeyBytes));
}
- public static MutationState upsertSelect(StatementContext childContext, TableRef tableRef, RowProjector projector,
- ResultIterator iterator, int[] columnIndexes, int[] pkSlotIndexes, boolean useServerTimestamp, boolean prefixSysColValues) throws SQLException {
+ public static MutationState upsertSelect(StatementContext childContext, TableRef tableRef,
+ RowProjector projector, ResultIterator iterator, int[] columnIndexes,
+ int[] pkSlotIndexes, boolean useServerTimestamp,
+ boolean prefixSysColValues) throws SQLException {
PhoenixStatement statement = childContext.getStatement();
PhoenixConnection connection = statement.getConnection();
ConnectionQueryServices services = connection.getQueryServices();
@@ -233,22 +235,30 @@ public class UpsertCompiler {
Integer scale = rsScale == 0 ? null : rsScale;
// We are guaranteed that the two column will have compatible types,
// as we checked that before.
- if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(), SortOrder.getDefault(), precision,
- scale, column.getMaxLength(), column.getScale())) { throw new SQLExceptionInfo.Builder(
- SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString())
- .setMessage("value=" + column.getDataType().toStringLiteral(ptr, null)).build()
- .buildException(); }
+ if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(),
+ SortOrder.getDefault(), precision,
+ scale, column.getMaxLength(), column.getScale())) {
+ throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(
+ column.getName().getString())
+ .setMessage("value=" + column.getDataType()
+ .toStringLiteral(ptr, null)).build()
+ .buildException();
+ }
column.getDataType().coerceBytes(ptr, value, column.getDataType(),
precision, scale, SortOrder.getDefault(),
column.getMaxLength(), column.getScale(), column.getSortOrder(),
table.rowKeyOrderOptimizable());
values[j] = ByteUtil.copyKeyBytesIfNecessary(ptr);
}
- setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, null, numSplColumns);
+ setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement,
+ useServerTimestamp, indexMaintainer, viewConstants, null,
+ numSplColumns);
rowCount++;
// Commit a batch if auto commit is true and we're at our batch size
if (autoFlush && rowCount % batchSize == 0) {
- MutationState state = new MutationState(tableRef, mutation, 0, maxSize, maxSizeBytes, connection);
+ MutationState state = new MutationState(tableRef, mutation, 0,
+ maxSize, maxSizeBytes, connection);
connection.getMutationState().join(state);
connection.getMutationState().send();
mutation.clear();
@@ -291,8 +301,8 @@ public class UpsertCompiler {
StatementContext childContext = new StatementContext(statement, false);
// Clone the row projector as it's not thread safe and would be used simultaneously by
// multiple threads otherwise.
- MutationState state = upsertSelect(childContext, tableRef, projector.cloneIfNecessary(), iterator, columnIndexes, pkSlotIndexes, useSeverTimestamp, false);
- return state;
+ return upsertSelect(childContext, tableRef, projector.cloneIfNecessary(), iterator,
+ columnIndexes, pkSlotIndexes, useSeverTimestamp, false);
}
public void setRowProjector(RowProjector projector) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index c13f86b..fe7425d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -1468,7 +1468,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result
throw new RuntimeException(e);
} catch (ExecutionException e) {
LOGGER.info("Failed to execute task during cancel", e);
- continue;
}
}
} finally {