You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/10/07 19:15:50 UTC
[35/48] phoenix git commit: PHOENIX-3253 Make changes to tests to
support method level parallelization
PHOENIX-3253 Make changes to tests to support method level parallelization
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/cae4a7c4
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/cae4a7c4
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/cae4a7c4
Branch: refs/heads/calcite
Commit: cae4a7c449b7f60e60a6cfd9b523087ff2e0d5a6
Parents: 2895835
Author: James Taylor <ja...@apache.org>
Authored: Sun Oct 2 11:10:14 2016 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Sun Oct 2 11:10:14 2016 -0700
----------------------------------------------------------------------
.../phoenix/end2end/FlappingLocalIndexIT.java | 300 +++++++++++++++++
.../phoenix/end2end/index/BaseLocalIndexIT.java | 80 +++++
.../phoenix/end2end/index/LocalIndexIT.java | 299 +----------------
.../phoenix/tx/FlappingTransactionIT.java | 328 ++++++++++++++++++
.../phoenix/tx/NotThreadSafeTransactionIT.java | 331 -------------------
pom.xml | 4 +-
6 files changed, 712 insertions(+), 630 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cae4a7c4/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java
new file mode 100644
index 0000000..7509997
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.end2end.index.BaseLocalIndexIT;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Test;
+
+public class FlappingLocalIndexIT extends BaseLocalIndexIT {
+
+ public FlappingLocalIndexIT(boolean isNamespaceMapped) {
+ super(isNamespaceMapped);
+ }
+
+ @Test
+ public void testScanWhenATableHasMultipleLocalIndexes() throws Exception {
+ String tableName = schemaName + "." + generateUniqueName();
+ String indexName = "IDX_" + generateUniqueName();
+
+ createBaseTable(tableName, null, "('e','i','o')");
+ Connection conn1 = DriverManager.getConnection(getUrl());
+ try {
+ conn1.createStatement().execute("UPSERT INTO " + tableName + " values('b',1,2,4,'z')");
+ conn1.createStatement().execute("UPSERT INTO " + tableName + " values('f',1,2,3,'a')");
+ conn1.createStatement().execute("UPSERT INTO " + tableName + " values('j',2,4,2,'a')");
+ conn1.createStatement().execute("UPSERT INTO " + tableName + " values('q',3,1,1,'c')");
+ conn1.commit();
+ conn1.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1)");
+ conn1.createStatement().execute("CREATE LOCAL INDEX " + indexName + "2 ON " + tableName + "(k3)");
+ conn1.commit();
+ conn1 = DriverManager.getConnection(getUrl());
+ ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName);
+ assertTrue(rs.next());
+ assertEquals(4, rs.getInt(1));
+ } finally {
+ conn1.close();
+ }
+ }
+
+ @Test
+ public void testLocalIndexScanWithSmallChunks() throws Exception {
+ String tableName = schemaName + "." + generateUniqueName();
+ String indexName = "IDX_" + generateUniqueName();
+
+ createBaseTable(tableName, 3, null);
+ Properties props = new Properties();
+ props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, "2");
+ props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceMapped));
+ Connection conn1 = DriverManager.getConnection(getUrl(), props);
+ try{
+ String[] strings = {"a","b","c","d","e","f","g","h","i","j","k","l","m","n","o","p","q","r","s","t","u","v","w","x","y","z"};
+ for (int i = 0; i < 26; i++) {
+ conn1.createStatement().execute(
+ "UPSERT INTO " + tableName + " values('"+strings[i]+"'," + i + ","
+ + (i + 1) + "," + (i + 2) + ",'" + strings[25 - i] + "')");
+ }
+ conn1.commit();
+ conn1.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1)");
+ conn1.createStatement().execute("CREATE LOCAL INDEX " + indexName + "_2 ON " + tableName + "(k3)");
+
+ ResultSet rs = conn1.createStatement().executeQuery("SELECT * FROM " + tableName);
+ assertTrue(rs.next());
+
+ String query = "SELECT t_id,k1,v1 FROM " + tableName;
+ rs = conn1.createStatement().executeQuery(query);
+ for (int j = 0; j < 26; j++) {
+ assertTrue(rs.next());
+ assertEquals(strings[25 - j], rs.getString("t_id"));
+ assertEquals(25 - j, rs.getInt("k1"));
+ assertEquals(strings[j], rs.getString("V1"));
+ }
+ query = "SELECT t_id,k1,k3 FROM " + tableName;
+ rs = conn1.createStatement().executeQuery(query);
+ Thread.sleep(1000);
+ for (int j = 0; j < 26; j++) {
+ assertTrue(rs.next());
+ assertEquals(strings[j], rs.getString("t_id"));
+ assertEquals(j, rs.getInt("k1"));
+ assertEquals(j + 2, rs.getInt("k3"));
+ }
+ } finally {
+ conn1.close();
+ }
+ }
+
+ @Test
+ public void testLocalIndexScan() throws Exception {
+ String tableName = schemaName + "." + generateUniqueName();
+ String indexName = "IDX_" + generateUniqueName();
+ String indexTableName = schemaName + "." + indexName;
+ TableName physicalTableName = SchemaUtil.getPhysicalTableName(tableName.getBytes(), isNamespaceMapped);
+ String indexPhysicalTableName = physicalTableName.getNameAsString();
+
+ createBaseTable(tableName, null, "('e','i','o')");
+ Connection conn1 = DriverManager.getConnection(getUrl());
+ try{
+ conn1.createStatement().execute("UPSERT INTO " + tableName + " values('a',1,2,5,'y')");
+ conn1.createStatement().execute("UPSERT INTO " + tableName + " values('b',1,2,4,'z')");
+ conn1.createStatement().execute("UPSERT INTO " + tableName + " values('f',1,2,3,'a')");
+ conn1.createStatement().execute("UPSERT INTO " + tableName + " values('e',1,2,3,'b')");
+ conn1.createStatement().execute("UPSERT INTO " + tableName + " values('j',2,4,2,'a')");
+ conn1.createStatement().execute("UPSERT INTO " + tableName + " values('q',3,1,1,'c')");
+ conn1.commit();
+ conn1.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1)");
+
+ ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + indexTableName);
+ assertTrue(rs.next());
+
+ HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
+ int numRegions = admin.getTableRegions(physicalTableName).size();
+
+ String query = "SELECT * FROM " + tableName +" where v1 like 'a%'";
+ rs = conn1.createStatement().executeQuery("EXPLAIN "+ query);
+
+ assertEquals(
+ "CLIENT PARALLEL " + numRegions + "-WAY RANGE SCAN OVER "
+ + indexPhysicalTableName + " [1,'a'] - [1,'b']\n"
+ + " SERVER FILTER BY FIRST KEY ONLY\n"
+ + "CLIENT MERGE SORT",
+ QueryUtil.getExplainPlan(rs));
+
+ rs = conn1.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals("f", rs.getString("t_id"));
+ assertEquals(1, rs.getInt("k1"));
+ assertEquals(2, rs.getInt("k2"));
+ assertEquals("a", rs.getString("v1"));
+ assertEquals(3, rs.getInt("k3"));
+ assertTrue(rs.next());
+ assertEquals("j", rs.getString("t_id"));
+ assertEquals(2, rs.getInt("k1"));
+ assertEquals(4, rs.getInt("k2"));
+ assertEquals("a", rs.getString("v1"));
+ assertEquals(2, rs.getInt("k3"));
+ assertFalse(rs.next());
+ query = "SELECT t_id, k1, k2,V1 FROM " + tableName +" where v1='a'";
+ rs = conn1.createStatement().executeQuery("EXPLAIN "+ query);
+
+ assertEquals(
+ "CLIENT PARALLEL " + numRegions + "-WAY RANGE SCAN OVER "
+ + indexPhysicalTableName + " [1,'a']\n"
+ + " SERVER FILTER BY FIRST KEY ONLY\n"
+ + "CLIENT MERGE SORT",
+ QueryUtil.getExplainPlan(rs));
+
+ rs = conn1.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals("f", rs.getString("t_id"));
+ assertEquals(1, rs.getInt("k1"));
+ assertEquals(2, rs.getInt("k2"));
+ assertTrue(rs.next());
+ assertEquals("j", rs.getString("t_id"));
+ assertEquals(2, rs.getInt("k1"));
+ assertEquals(4, rs.getInt("k2"));
+ assertFalse(rs.next());
+ query = "SELECT t_id, k1, k2,V1, k3 FROM " + tableName +" where v1<='z' order by k3";
+ rs = conn1.createStatement().executeQuery("EXPLAIN "+ query);
+
+ assertEquals("CLIENT PARALLEL " + numRegions + "-WAY RANGE SCAN OVER " + indexPhysicalTableName
+ + " [1,*] - [1,'z']\n" + " SERVER FILTER BY FIRST KEY ONLY\n"
+ + " SERVER SORTED BY [\"K3\"]\n" + "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
+
+ rs = conn1.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt("k3"));
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt("k3"));
+ assertTrue(rs.next());
+ assertEquals(3, rs.getInt("k3"));
+ assertTrue(rs.next());
+ assertEquals(3, rs.getInt("k3"));
+ assertTrue(rs.next());
+ assertEquals(4, rs.getInt("k3"));
+ assertTrue(rs.next());
+ assertEquals(5, rs.getInt("k3"));
+ assertFalse(rs.next());
+
+ query = "SELECT t_id, k1, k2,v1 from " + tableName + " order by V1,t_id";
+ rs = conn1.createStatement().executeQuery("EXPLAIN " + query);
+
+ assertEquals(
+ "CLIENT PARALLEL " + numRegions + "-WAY RANGE SCAN OVER "
+ + indexPhysicalTableName +" [1]\n"
+ + " SERVER FILTER BY FIRST KEY ONLY\n"
+ + "CLIENT MERGE SORT",
+ QueryUtil.getExplainPlan(rs));
+
+ rs = conn1.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals("f", rs.getString("t_id"));
+ assertEquals(1, rs.getInt("k1"));
+ assertEquals(2, rs.getInt("k2"));
+ assertEquals("a", rs.getString("V1"));
+ assertTrue(rs.next());
+ assertEquals("j", rs.getString("t_id"));
+ assertEquals(2, rs.getInt("k1"));
+ assertEquals(4, rs.getInt("k2"));
+ assertEquals("a", rs.getString("V1"));
+ assertTrue(rs.next());
+ assertEquals("e", rs.getString("t_id"));
+ assertEquals(1, rs.getInt("k1"));
+ assertEquals(2, rs.getInt("k2"));
+ assertEquals("b", rs.getString("V1"));
+ assertTrue(rs.next());
+ assertEquals("q", rs.getString("t_id"));
+ assertEquals(3, rs.getInt("k1"));
+ assertEquals(1, rs.getInt("k2"));
+ assertEquals("c", rs.getString("V1"));
+ assertTrue(rs.next());
+ assertEquals("a", rs.getString("t_id"));
+ assertEquals(1, rs.getInt("k1"));
+ assertEquals(2, rs.getInt("k2"));
+ assertEquals("y", rs.getString("V1"));
+ assertTrue(rs.next());
+ assertEquals("b", rs.getString("t_id"));
+ assertEquals(1, rs.getInt("k1"));
+ assertEquals(2, rs.getInt("k2"));
+ assertEquals("z", rs.getString("V1"));
+ } finally {
+ conn1.close();
+ }
+ }
+
+ @Test
+ public void testBuildIndexWhenUserTableAlreadyHasData() throws Exception {
+ String tableName = schemaName + "." + generateUniqueName();
+ String indexName = "IDX_" + generateUniqueName();
+ String indexTableName = schemaName + "." + indexName;
+ TableName physicalTableName = SchemaUtil.getPhysicalTableName(tableName.getBytes(), isNamespaceMapped);
+ String indexPhysicalTableName = physicalTableName.getNameAsString();
+
+ createBaseTable(tableName, null, "('e','i','o')");
+ Connection conn1 = DriverManager.getConnection(getUrl());
+ conn1.createStatement().execute("UPSERT INTO "+tableName+" values('b',1,2,4,'z')");
+ conn1.createStatement().execute("UPSERT INTO "+tableName+" values('f',1,2,3,'z')");
+ conn1.createStatement().execute("UPSERT INTO "+tableName+" values('j',2,4,2,'a')");
+ conn1.createStatement().execute("UPSERT INTO "+tableName+" values('q',3,1,1,'c')");
+ conn1.commit();
+ conn1.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1)");
+ ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + indexTableName);
+ assertTrue(rs.next());
+ assertEquals(4, rs.getInt(1));
+ HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
+ HTable indexTable = new HTable(admin.getConfiguration(),Bytes.toBytes(indexPhysicalTableName));
+ Pair<byte[][], byte[][]> startEndKeys = indexTable.getStartEndKeys();
+ byte[][] startKeys = startEndKeys.getFirst();
+ byte[][] endKeys = startEndKeys.getSecond();
+ for (int i = 0; i < startKeys.length; i++) {
+ Scan s = new Scan();
+ s.addFamily(QueryConstants.DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES);
+ s.setStartRow(startKeys[i]);
+ s.setStopRow(endKeys[i]);
+ ResultScanner scanner = indexTable.getScanner(s);
+ int count = 0;
+ for(Result r:scanner){
+ count++;
+ }
+ scanner.close();
+ assertEquals(1, count);
+ }
+ indexTable.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cae4a7c4/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseLocalIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseLocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseLocalIndexIT.java
new file mode 100644
index 0000000..5c8670d
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseLocalIndexIT.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public abstract class BaseLocalIndexIT extends ParallelStatsDisabledIT {
+ protected boolean isNamespaceMapped;
+ protected String schemaName;
+
+ public BaseLocalIndexIT(boolean isNamespaceMapped) {
+ this.isNamespaceMapped = isNamespaceMapped;
+ }
+
+ @Before
+ public void setup() {
+ schemaName = BaseTest.generateUniqueName();
+ }
+
+ protected Connection getConnection() throws SQLException{
+ Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+ props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceMapped));
+ return DriverManager.getConnection(getUrl(),props);
+ }
+
+ protected void createBaseTable(String tableName, Integer saltBuckets, String splits) throws SQLException {
+ Connection conn = getConnection();
+ if (isNamespaceMapped) {
+ conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
+ }
+ String ddl = "CREATE TABLE " + tableName + " (t_id VARCHAR NOT NULL,\n" +
+ "k1 INTEGER NOT NULL,\n" +
+ "k2 INTEGER NOT NULL,\n" +
+ "k3 INTEGER,\n" +
+ "v1 VARCHAR,\n" +
+ "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n"
+ + (saltBuckets != null && splits == null ? (" salt_buckets=" + saltBuckets) : ""
+ + (saltBuckets == null && splits != null ? (" split on " + splits) : ""));
+ conn.createStatement().execute(ddl);
+ conn.close();
+ }
+
+ @Parameters(name = "LocalIndexIT_isNamespaceMapped={0}") // name is used by failsafe as file name in reports
+ public static Collection<Boolean> data() {
+ return Arrays.asList(true, false);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cae4a7c4/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
index 234a466..bf99db0 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
@@ -31,9 +31,7 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.Properties;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -47,64 +45,25 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
-import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.hbase.index.IndexRegionSplitPolicy;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PNameFactory;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.TableNotFoundException;
-import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
-import org.junit.Before;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-@RunWith(Parameterized.class)
-public class LocalIndexIT extends ParallelStatsDisabledIT {
- private boolean isNamespaceMapped;
- private String schemaName;
+public class LocalIndexIT extends BaseLocalIndexIT {
public LocalIndexIT(boolean isNamespaceMapped) {
- this.isNamespaceMapped = isNamespaceMapped;
- }
-
- @Before
- public void setup() {
- schemaName = BaseTest.generateUniqueName();
+ super(isNamespaceMapped);
}
- private void createBaseTable(String tableName, Integer saltBuckets, String splits) throws SQLException {
- Connection conn = getConnection();
- if (isNamespaceMapped) {
- conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
- }
- String ddl = "CREATE TABLE " + tableName + " (t_id VARCHAR NOT NULL,\n" +
- "k1 INTEGER NOT NULL,\n" +
- "k2 INTEGER NOT NULL,\n" +
- "k3 INTEGER,\n" +
- "v1 VARCHAR,\n" +
- "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n"
- + (saltBuckets != null && splits == null ? (" salt_buckets=" + saltBuckets) : ""
- + (saltBuckets == null && splits != null ? (" split on " + splits) : ""));
- conn.createStatement().execute(ddl);
- conn.close();
- }
-
- @Parameters(name = "LocalIndexIT_isNamespaceMapped={0}") // name is used by failsafe as file name in reports
- public static Collection<Boolean> data() {
- return Arrays.asList(true, false);
- }
-
@Test
public void testLocalIndexRoundTrip() throws Exception {
String tableName = schemaName + "." + generateUniqueName();
@@ -192,12 +151,6 @@ public class LocalIndexIT extends ParallelStatsDisabledIT {
}
}
- private Connection getConnection() throws SQLException{
- Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
- props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceMapped));
- return DriverManager.getConnection(getUrl(),props);
- }
-
@Test
public void testDropLocalIndexTable() throws Exception {
String tableName = schemaName + "." + generateUniqueName();
@@ -257,184 +210,6 @@ public class LocalIndexIT extends ParallelStatsDisabledIT {
}
@Test
- public void testBuildIndexWhenUserTableAlreadyHasData() throws Exception {
- String tableName = schemaName + "." + generateUniqueName();
- String indexName = "IDX_" + generateUniqueName();
- String indexTableName = schemaName + "." + indexName;
- TableName physicalTableName = SchemaUtil.getPhysicalTableName(tableName.getBytes(), isNamespaceMapped);
- String indexPhysicalTableName = physicalTableName.getNameAsString();
-
- createBaseTable(tableName, null, "('e','i','o')");
- Connection conn1 = DriverManager.getConnection(getUrl());
- conn1.createStatement().execute("UPSERT INTO "+tableName+" values('b',1,2,4,'z')");
- conn1.createStatement().execute("UPSERT INTO "+tableName+" values('f',1,2,3,'z')");
- conn1.createStatement().execute("UPSERT INTO "+tableName+" values('j',2,4,2,'a')");
- conn1.createStatement().execute("UPSERT INTO "+tableName+" values('q',3,1,1,'c')");
- conn1.commit();
- conn1.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1)");
- ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + indexTableName);
- assertTrue(rs.next());
- assertEquals(4, rs.getInt(1));
- HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
- HTable indexTable = new HTable(admin.getConfiguration(),Bytes.toBytes(indexPhysicalTableName));
- Pair<byte[][], byte[][]> startEndKeys = indexTable.getStartEndKeys();
- byte[][] startKeys = startEndKeys.getFirst();
- byte[][] endKeys = startEndKeys.getSecond();
- for (int i = 0; i < startKeys.length; i++) {
- Scan s = new Scan();
- s.addFamily(QueryConstants.DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES);
- s.setStartRow(startKeys[i]);
- s.setStopRow(endKeys[i]);
- ResultScanner scanner = indexTable.getScanner(s);
- int count = 0;
- for(Result r:scanner){
- count++;
- }
- scanner.close();
- assertEquals(1, count);
- }
- indexTable.close();
- }
-
- @Test
- public void testLocalIndexScan() throws Exception {
- String tableName = schemaName + "." + generateUniqueName();
- String indexName = "IDX_" + generateUniqueName();
- String indexTableName = schemaName + "." + indexName;
- TableName physicalTableName = SchemaUtil.getPhysicalTableName(tableName.getBytes(), isNamespaceMapped);
- String indexPhysicalTableName = physicalTableName.getNameAsString();
-
- createBaseTable(tableName, null, "('e','i','o')");
- Connection conn1 = DriverManager.getConnection(getUrl());
- try{
- conn1.createStatement().execute("UPSERT INTO " + tableName + " values('a',1,2,5,'y')");
- conn1.createStatement().execute("UPSERT INTO " + tableName + " values('b',1,2,4,'z')");
- conn1.createStatement().execute("UPSERT INTO " + tableName + " values('f',1,2,3,'a')");
- conn1.createStatement().execute("UPSERT INTO " + tableName + " values('e',1,2,3,'b')");
- conn1.createStatement().execute("UPSERT INTO " + tableName + " values('j',2,4,2,'a')");
- conn1.createStatement().execute("UPSERT INTO " + tableName + " values('q',3,1,1,'c')");
- conn1.commit();
- conn1.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1)");
-
- ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + indexTableName);
- assertTrue(rs.next());
-
- HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
- int numRegions = admin.getTableRegions(physicalTableName).size();
-
- String query = "SELECT * FROM " + tableName +" where v1 like 'a%'";
- rs = conn1.createStatement().executeQuery("EXPLAIN "+ query);
-
- assertEquals(
- "CLIENT PARALLEL " + numRegions + "-WAY RANGE SCAN OVER "
- + indexPhysicalTableName + " [1,'a'] - [1,'b']\n"
- + " SERVER FILTER BY FIRST KEY ONLY\n"
- + "CLIENT MERGE SORT",
- QueryUtil.getExplainPlan(rs));
-
- rs = conn1.createStatement().executeQuery(query);
- assertTrue(rs.next());
- assertEquals("f", rs.getString("t_id"));
- assertEquals(1, rs.getInt("k1"));
- assertEquals(2, rs.getInt("k2"));
- assertEquals("a", rs.getString("v1"));
- assertEquals(3, rs.getInt("k3"));
- assertTrue(rs.next());
- assertEquals("j", rs.getString("t_id"));
- assertEquals(2, rs.getInt("k1"));
- assertEquals(4, rs.getInt("k2"));
- assertEquals("a", rs.getString("v1"));
- assertEquals(2, rs.getInt("k3"));
- assertFalse(rs.next());
- query = "SELECT t_id, k1, k2,V1 FROM " + tableName +" where v1='a'";
- rs = conn1.createStatement().executeQuery("EXPLAIN "+ query);
-
- assertEquals(
- "CLIENT PARALLEL " + numRegions + "-WAY RANGE SCAN OVER "
- + indexPhysicalTableName + " [1,'a']\n"
- + " SERVER FILTER BY FIRST KEY ONLY\n"
- + "CLIENT MERGE SORT",
- QueryUtil.getExplainPlan(rs));
-
- rs = conn1.createStatement().executeQuery(query);
- assertTrue(rs.next());
- assertEquals("f", rs.getString("t_id"));
- assertEquals(1, rs.getInt("k1"));
- assertEquals(2, rs.getInt("k2"));
- assertTrue(rs.next());
- assertEquals("j", rs.getString("t_id"));
- assertEquals(2, rs.getInt("k1"));
- assertEquals(4, rs.getInt("k2"));
- assertFalse(rs.next());
- query = "SELECT t_id, k1, k2,V1, k3 FROM " + tableName +" where v1<='z' order by k3";
- rs = conn1.createStatement().executeQuery("EXPLAIN "+ query);
-
- assertEquals("CLIENT PARALLEL " + numRegions + "-WAY RANGE SCAN OVER " + indexPhysicalTableName
- + " [1,*] - [1,'z']\n" + " SERVER FILTER BY FIRST KEY ONLY\n"
- + " SERVER SORTED BY [\"K3\"]\n" + "CLIENT MERGE SORT", QueryUtil.getExplainPlan(rs));
-
- rs = conn1.createStatement().executeQuery(query);
- assertTrue(rs.next());
- assertEquals(1, rs.getInt("k3"));
- assertTrue(rs.next());
- assertEquals(2, rs.getInt("k3"));
- assertTrue(rs.next());
- assertEquals(3, rs.getInt("k3"));
- assertTrue(rs.next());
- assertEquals(3, rs.getInt("k3"));
- assertTrue(rs.next());
- assertEquals(4, rs.getInt("k3"));
- assertTrue(rs.next());
- assertEquals(5, rs.getInt("k3"));
- assertFalse(rs.next());
-
- query = "SELECT t_id, k1, k2,v1 from " + tableName + " order by V1,t_id";
- rs = conn1.createStatement().executeQuery("EXPLAIN " + query);
-
- assertEquals(
- "CLIENT PARALLEL " + numRegions + "-WAY RANGE SCAN OVER "
- + indexPhysicalTableName +" [1]\n"
- + " SERVER FILTER BY FIRST KEY ONLY\n"
- + "CLIENT MERGE SORT",
- QueryUtil.getExplainPlan(rs));
-
- rs = conn1.createStatement().executeQuery(query);
- assertTrue(rs.next());
- assertEquals("f", rs.getString("t_id"));
- assertEquals(1, rs.getInt("k1"));
- assertEquals(2, rs.getInt("k2"));
- assertEquals("a", rs.getString("V1"));
- assertTrue(rs.next());
- assertEquals("j", rs.getString("t_id"));
- assertEquals(2, rs.getInt("k1"));
- assertEquals(4, rs.getInt("k2"));
- assertEquals("a", rs.getString("V1"));
- assertTrue(rs.next());
- assertEquals("e", rs.getString("t_id"));
- assertEquals(1, rs.getInt("k1"));
- assertEquals(2, rs.getInt("k2"));
- assertEquals("b", rs.getString("V1"));
- assertTrue(rs.next());
- assertEquals("q", rs.getString("t_id"));
- assertEquals(3, rs.getInt("k1"));
- assertEquals(1, rs.getInt("k2"));
- assertEquals("c", rs.getString("V1"));
- assertTrue(rs.next());
- assertEquals("a", rs.getString("t_id"));
- assertEquals(1, rs.getInt("k1"));
- assertEquals(2, rs.getInt("k2"));
- assertEquals("y", rs.getString("V1"));
- assertTrue(rs.next());
- assertEquals("b", rs.getString("t_id"));
- assertEquals(1, rs.getInt("k1"));
- assertEquals(2, rs.getInt("k2"));
- assertEquals("z", rs.getString("V1"));
- } finally {
- conn1.close();
- }
- }
-
- @Test
public void testLocalIndexScanJoinColumnsFromDataTable() throws Exception {
String tableName = schemaName + "." + generateUniqueName();
String indexName = "IDX_" + generateUniqueName();
@@ -670,31 +445,6 @@ public class LocalIndexIT extends ParallelStatsDisabledIT {
}
@Test
- public void testScanWhenATableHasMultipleLocalIndexes() throws Exception {
- String tableName = schemaName + "." + generateUniqueName();
- String indexName = "IDX_" + generateUniqueName();
-
- createBaseTable(tableName, null, "('e','i','o')");
- Connection conn1 = DriverManager.getConnection(getUrl());
- try {
- conn1.createStatement().execute("UPSERT INTO " + tableName + " values('b',1,2,4,'z')");
- conn1.createStatement().execute("UPSERT INTO " + tableName + " values('f',1,2,3,'a')");
- conn1.createStatement().execute("UPSERT INTO " + tableName + " values('j',2,4,2,'a')");
- conn1.createStatement().execute("UPSERT INTO " + tableName + " values('q',3,1,1,'c')");
- conn1.commit();
- conn1.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1)");
- conn1.createStatement().execute("CREATE LOCAL INDEX " + indexName + "2 ON " + tableName + "(k3)");
- conn1.commit();
- conn1 = DriverManager.getConnection(getUrl());
- ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName);
- assertTrue(rs.next());
- assertEquals(4, rs.getInt(1));
- } finally {
- conn1.close();
- }
- }
-
- @Test
public void testLocalIndexesOnTableWithImmutableRows() throws Exception {
String tableName = schemaName + "." + generateUniqueName();
String indexName = "IDX_" + generateUniqueName();
@@ -791,49 +541,4 @@ public class LocalIndexIT extends ParallelStatsDisabledIT {
}
}
- @Test
- public void testLocalIndexScanWithSmallChunks() throws Exception {
- String tableName = schemaName + "." + generateUniqueName();
- String indexName = "IDX_" + generateUniqueName();
-
- createBaseTable(tableName, 3, null);
- Properties props = new Properties();
- props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, "2");
- props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceMapped));
- Connection conn1 = DriverManager.getConnection(getUrl(), props);
- try{
- String[] strings = {"a","b","c","d","e","f","g","h","i","j","k","l","m","n","o","p","q","r","s","t","u","v","w","x","y","z"};
- for (int i = 0; i < 26; i++) {
- conn1.createStatement().execute(
- "UPSERT INTO " + tableName + " values('"+strings[i]+"'," + i + ","
- + (i + 1) + "," + (i + 2) + ",'" + strings[25 - i] + "')");
- }
- conn1.commit();
- conn1.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1)");
- conn1.createStatement().execute("CREATE LOCAL INDEX " + indexName + "_2 ON " + tableName + "(k3)");
-
- ResultSet rs = conn1.createStatement().executeQuery("SELECT * FROM " + tableName);
- assertTrue(rs.next());
-
- String query = "SELECT t_id,k1,v1 FROM " + tableName;
- rs = conn1.createStatement().executeQuery(query);
- for (int j = 0; j < 26; j++) {
- assertTrue(rs.next());
- assertEquals(strings[25 - j], rs.getString("t_id"));
- assertEquals(25 - j, rs.getInt("k1"));
- assertEquals(strings[j], rs.getString("V1"));
- }
- query = "SELECT t_id,k1,k3 FROM " + tableName;
- rs = conn1.createStatement().executeQuery(query);
- Thread.sleep(1000);
- for (int j = 0; j < 26; j++) {
- assertTrue(rs.next());
- assertEquals(strings[j], rs.getString("t_id"));
- assertEquals(j, rs.getInt("k1"));
- assertEquals(j + 2, rs.getInt("k3"));
- }
- } finally {
- conn1.close();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cae4a7c4/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
new file mode 100644
index 0000000..5a990cf
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/FlappingTransactionIT.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.tx;
+
+import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.apache.phoenix.util.TestUtil.createTransactionalTable;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.apache.tephra.TransactionContext;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.TransactionAwareHTable;
+import org.junit.Test;
+
+/**
+ *
+ * Transaction related tests that flap when run in parallel.
+ * TODO: review with Tephra community
+ *
+ */
+public class FlappingTransactionIT extends ParallelStatsDisabledIT {
+ @Test
+ public void testDelete() throws Exception {
+ String transTableName = generateUniqueName();
+ String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName;
+ String selectSQL = "SELECT * FROM " + fullTableName;
+ try (Connection conn = DriverManager.getConnection(getUrl());
+ Connection conn1 = DriverManager.getConnection(getUrl());
+ Connection conn2 = DriverManager.getConnection(getUrl())) {
+ TestUtil.createTransactionalTable(conn, fullTableName);
+ conn1.setAutoCommit(false);
+ ResultSet rs = conn1.createStatement().executeQuery(selectSQL);
+ assertFalse(rs.next());
+
+ String upsert = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
+ PreparedStatement stmt = conn1.prepareStatement(upsert);
+ // upsert two rows
+ TestUtil.setRowKeyColumns(stmt, 1);
+ stmt.execute();
+ conn1.commit();
+
+ TestUtil.setRowKeyColumns(stmt, 2);
+ stmt.execute();
+
+ // verify rows can be read even though commit has not been called
+ int rowsDeleted = conn1.createStatement().executeUpdate("DELETE FROM " + fullTableName);
+ assertEquals(2, rowsDeleted);
+
+ // Delete and second upsert not committed yet, so there should be one row.
+ rs = conn2.createStatement().executeQuery("SELECT count(*) FROM " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+
+ conn1.commit();
+
+ // verify rows are deleted after commit
+ rs = conn1.createStatement().executeQuery(selectSQL);
+ assertFalse(rs.next());
+ }
+ }
+
+ @Test
+ public void testInflightUpdateNotSeen() throws Exception {
+ String transTableName = generateUniqueName();
+ String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName;
+ String selectSQL = "SELECT * FROM " + fullTableName;
+ try (Connection conn = DriverManager.getConnection(getUrl());
+ Connection conn1 = DriverManager.getConnection(getUrl());
+ Connection conn2 = DriverManager.getConnection(getUrl())) {
+ createTransactionalTable(conn, fullTableName);
+ conn1.setAutoCommit(false);
+ conn2.setAutoCommit(true);
+ ResultSet rs = conn1.createStatement().executeQuery(selectSQL);
+ assertFalse(rs.next());
+
+ String upsert = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
+ PreparedStatement stmt = conn1.prepareStatement(upsert);
+ // upsert two rows
+ TestUtil.setRowKeyColumns(stmt, 1);
+ stmt.execute();
+ conn1.commit();
+
+ TestUtil.setRowKeyColumns(stmt, 2);
+ stmt.execute();
+
+ rs = conn1.createStatement().executeQuery("SELECT count(*) FROM " + fullTableName + " WHERE int_col1 IS NULL");
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+
+ upsert = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, int_col1) VALUES(?, ?, ?, ?, ?, ?, 1)";
+ stmt = conn1.prepareStatement(upsert);
+ TestUtil.setRowKeyColumns(stmt, 1);
+ stmt.execute();
+
+ rs = conn1.createStatement().executeQuery("SELECT int_col1 FROM " + fullTableName + " WHERE int_col1 = 1");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertFalse(rs.next());
+
+ rs = conn2.createStatement().executeQuery("SELECT count(*) FROM " + fullTableName + " WHERE int_col1 = 1");
+ assertTrue(rs.next());
+ assertEquals(0, rs.getInt(1));
+ rs = conn2.createStatement().executeQuery("SELECT * FROM " + fullTableName + " WHERE int_col1 = 1");
+ assertFalse(rs.next());
+
+ conn1.commit();
+
+ rs = conn2.createStatement().executeQuery("SELECT count(*) FROM " + fullTableName + " WHERE int_col1 = 1");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ rs = conn2.createStatement().executeQuery("SELECT * FROM " + fullTableName + " WHERE int_col1 = 1");
+ assertTrue(rs.next());
+ assertFalse(rs.next());
+ }
+ }
+
+ @Test
+ public void testInflightDeleteNotSeen() throws Exception {
+ String transTableName = generateUniqueName();
+ String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName;
+ String selectSQL = "SELECT * FROM " + fullTableName;
+ try (Connection conn = DriverManager.getConnection(getUrl());
+ Connection conn1 = DriverManager.getConnection(getUrl());
+ Connection conn2 = DriverManager.getConnection(getUrl())) {
+ createTransactionalTable(conn, fullTableName);
+ conn1.setAutoCommit(false);
+ conn2.setAutoCommit(true);
+ ResultSet rs = conn1.createStatement().executeQuery(selectSQL);
+ assertFalse(rs.next());
+
+ String upsert = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
+ PreparedStatement stmt = conn1.prepareStatement(upsert);
+ // upsert two rows
+ TestUtil.setRowKeyColumns(stmt, 1);
+ stmt.execute();
+ TestUtil.setRowKeyColumns(stmt, 2);
+ stmt.execute();
+
+ conn1.commit();
+
+ rs = conn1.createStatement().executeQuery("SELECT count(*) FROM " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+
+ String delete = "DELETE FROM " + fullTableName + " WHERE varchar_pk = 'varchar1'";
+ stmt = conn1.prepareStatement(delete);
+ int count = stmt.executeUpdate();
+ assertEquals(1,count);
+
+ rs = conn1.createStatement().executeQuery("SELECT count(*) FROM " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertFalse(rs.next());
+
+ rs = conn2.createStatement().executeQuery("SELECT count(*) FROM " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ assertFalse(rs.next());
+
+ conn1.commit();
+
+ rs = conn2.createStatement().executeQuery("SELECT count(*) FROM " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertFalse(rs.next());
+ }
+ }
+
+ @Test
+ public void testExternalTxContext() throws Exception {
+ ResultSet rs;
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(false);
+ String fullTableName = generateUniqueName();
+ PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+
+ TransactionSystemClient txServiceClient = pconn.getQueryServices().getTransactionSystemClient();
+
+ Statement stmt = conn.createStatement();
+ stmt.execute("CREATE TABLE " + fullTableName + "(K VARCHAR PRIMARY KEY, V1 VARCHAR, V2 VARCHAR) TRANSACTIONAL=true");
+ HTableInterface htable = pconn.getQueryServices().getTable(Bytes.toBytes(fullTableName));
+ stmt.executeUpdate("upsert into " + fullTableName + " values('x', 'a', 'a')");
+ conn.commit();
+
+ try (Connection newConn = DriverManager.getConnection(getUrl(), props)) {
+ rs = newConn.createStatement().executeQuery("select count(*) from " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals(1,rs.getInt(1));
+ }
+
+ // Use HBase level Tephra APIs to start a new transaction
+ TransactionAwareHTable txAware = new TransactionAwareHTable(htable, TxConstants.ConflictDetection.ROW);
+ TransactionContext txContext = new TransactionContext(txServiceClient, txAware);
+ txContext.start();
+
+ // Use HBase APIs to add a new row
+ Put put = new Put(Bytes.toBytes("z"));
+ put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+ put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes("V1"), Bytes.toBytes("b"));
+ txAware.put(put);
+
+ // Use Phoenix APIs to add new row (sharing the transaction context)
+ pconn.setTransactionContext(txContext);
+ conn.createStatement().executeUpdate("upsert into " + fullTableName + " values('y', 'c', 'c')");
+
+ // New connection should not see data as it hasn't been committed yet
+ try (Connection newConn = DriverManager.getConnection(getUrl(), props)) {
+ rs = newConn.createStatement().executeQuery("select count(*) from " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals(1,rs.getInt(1));
+ }
+
+ // Use new connection to create a row with a conflict
+ Connection connWithConflict = DriverManager.getConnection(getUrl(), props);
+ connWithConflict.createStatement().execute("upsert into " + fullTableName + " values('z', 'd', 'd')");
+
+ // Existing connection should see data even though it hasn't been committed yet
+ rs = conn.createStatement().executeQuery("select count(*) from " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals(3,rs.getInt(1));
+
+ // Use Tephra APIs directly to finish (i.e. commit) the transaction
+ txContext.finish();
+
+ // Confirm that attempt to commit row with conflict fails
+ try {
+ connWithConflict.commit();
+ fail();
+ } catch (SQLException e) {
+ assertEquals(SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode(), e.getErrorCode());
+ } finally {
+ connWithConflict.close();
+ }
+
+ // New connection should now see data as it has been committed
+ try (Connection newConn = DriverManager.getConnection(getUrl(), props)) {
+ rs = newConn.createStatement().executeQuery("select count(*) from " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals(3,rs.getInt(1));
+ }
+
+ // Repeat the same as above, but this time abort the transaction
+ txContext = new TransactionContext(txServiceClient, txAware);
+ txContext.start();
+
+ // Use HBase APIs to add a new row
+ put = new Put(Bytes.toBytes("j"));
+ put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
+ put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes("V1"), Bytes.toBytes("e"));
+ txAware.put(put);
+
+ // Use Phoenix APIs to add new row (sharing the transaction context)
+ pconn.setTransactionContext(txContext);
+ conn.createStatement().executeUpdate("upsert into " + fullTableName + " values('k', 'f', 'f')");
+
+ // Existing connection should see data even though it hasn't been committed yet
+ rs = conn.createStatement().executeQuery("select count(*) from " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals(5,rs.getInt(1));
+
+ connWithConflict.createStatement().execute("upsert into " + fullTableName + " values('k', 'g', 'g')");
+ rs = connWithConflict.createStatement().executeQuery("select count(*) from " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals(4,rs.getInt(1));
+
+ // Use Tephra APIs directly to abort (i.e. rollback) the transaction
+ txContext.abort();
+
+ rs = conn.createStatement().executeQuery("select count(*) from " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals(3,rs.getInt(1));
+
+ // Should succeed since conflicting row was aborted
+ connWithConflict.commit();
+
+ // New connection should now see data as it has been committed
+ try (Connection newConn = DriverManager.getConnection(getUrl(), props)) {
+ rs = newConn.createStatement().executeQuery("select count(*) from " + fullTableName);
+ assertTrue(rs.next());
+ assertEquals(4,rs.getInt(1));
+ }
+
+ // Even using HBase APIs directly, we shouldn't find 'j' since a delete marker would have been
+ // written to hide it.
+ Result result = htable.get(new Get(Bytes.toBytes("j")));
+ assertTrue(result.isEmpty());
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cae4a7c4/phoenix-core/src/it/java/org/apache/phoenix/tx/NotThreadSafeTransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/NotThreadSafeTransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/NotThreadSafeTransactionIT.java
deleted file mode 100644
index e0005e4..0000000
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/NotThreadSafeTransactionIT.java
+++ /dev/null
@@ -1,331 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.tx;
-
-import static org.apache.phoenix.util.TestUtil.INDEX_DATA_SCHEMA;
-import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.apache.phoenix.util.TestUtil.createTransactionalTable;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Properties;
-
-import javax.annotation.concurrent.NotThreadSafe;
-
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
-import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.TestUtil;
-import org.apache.tephra.TransactionContext;
-import org.apache.tephra.TransactionSystemClient;
-import org.apache.tephra.TxConstants;
-import org.apache.tephra.hbase.TransactionAwareHTable;
-import org.junit.Test;
-
-/**
- *
- * Transaction related tests that flap when run in parallel.
- * TODO: review with Tephra community
- *
- */
-@NotThreadSafe // Prevents test methods from running in parallel
-public class NotThreadSafeTransactionIT extends ParallelStatsDisabledIT {
- @Test
- public void testDelete() throws Exception {
- String transTableName = generateUniqueName();
- String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName;
- String selectSQL = "SELECT * FROM " + fullTableName;
- try (Connection conn = DriverManager.getConnection(getUrl());
- Connection conn1 = DriverManager.getConnection(getUrl());
- Connection conn2 = DriverManager.getConnection(getUrl())) {
- TestUtil.createTransactionalTable(conn, fullTableName);
- conn1.setAutoCommit(false);
- ResultSet rs = conn1.createStatement().executeQuery(selectSQL);
- assertFalse(rs.next());
-
- String upsert = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
- PreparedStatement stmt = conn1.prepareStatement(upsert);
- // upsert two rows
- TestUtil.setRowKeyColumns(stmt, 1);
- stmt.execute();
- conn1.commit();
-
- TestUtil.setRowKeyColumns(stmt, 2);
- stmt.execute();
-
- // verify rows can be read even though commit has not been called
- int rowsDeleted = conn1.createStatement().executeUpdate("DELETE FROM " + fullTableName);
- assertEquals(2, rowsDeleted);
-
- // Delete and second upsert not committed yet, so there should be one row.
- rs = conn2.createStatement().executeQuery("SELECT count(*) FROM " + fullTableName);
- assertTrue(rs.next());
- assertEquals(1, rs.getInt(1));
-
- conn1.commit();
-
- // verify rows are deleted after commit
- rs = conn1.createStatement().executeQuery(selectSQL);
- assertFalse(rs.next());
- }
- }
-
- @Test
- public void testInflightUpdateNotSeen() throws Exception {
- String transTableName = generateUniqueName();
- String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName;
- String selectSQL = "SELECT * FROM " + fullTableName;
- try (Connection conn = DriverManager.getConnection(getUrl());
- Connection conn1 = DriverManager.getConnection(getUrl());
- Connection conn2 = DriverManager.getConnection(getUrl())) {
- createTransactionalTable(conn, fullTableName);
- conn1.setAutoCommit(false);
- conn2.setAutoCommit(true);
- ResultSet rs = conn1.createStatement().executeQuery(selectSQL);
- assertFalse(rs.next());
-
- String upsert = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
- PreparedStatement stmt = conn1.prepareStatement(upsert);
- // upsert two rows
- TestUtil.setRowKeyColumns(stmt, 1);
- stmt.execute();
- conn1.commit();
-
- TestUtil.setRowKeyColumns(stmt, 2);
- stmt.execute();
-
- rs = conn1.createStatement().executeQuery("SELECT count(*) FROM " + fullTableName + " WHERE int_col1 IS NULL");
- assertTrue(rs.next());
- assertEquals(2, rs.getInt(1));
-
- upsert = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk, int_col1) VALUES(?, ?, ?, ?, ?, ?, 1)";
- stmt = conn1.prepareStatement(upsert);
- TestUtil.setRowKeyColumns(stmt, 1);
- stmt.execute();
-
- rs = conn1.createStatement().executeQuery("SELECT int_col1 FROM " + fullTableName + " WHERE int_col1 = 1");
- assertTrue(rs.next());
- assertEquals(1, rs.getInt(1));
- assertFalse(rs.next());
-
- rs = conn2.createStatement().executeQuery("SELECT count(*) FROM " + fullTableName + " WHERE int_col1 = 1");
- assertTrue(rs.next());
- assertEquals(0, rs.getInt(1));
- rs = conn2.createStatement().executeQuery("SELECT * FROM " + fullTableName + " WHERE int_col1 = 1");
- assertFalse(rs.next());
-
- conn1.commit();
-
- rs = conn2.createStatement().executeQuery("SELECT count(*) FROM " + fullTableName + " WHERE int_col1 = 1");
- assertTrue(rs.next());
- assertEquals(1, rs.getInt(1));
- rs = conn2.createStatement().executeQuery("SELECT * FROM " + fullTableName + " WHERE int_col1 = 1");
- assertTrue(rs.next());
- assertFalse(rs.next());
- }
- }
-
- @Test
- public void testInflightDeleteNotSeen() throws Exception {
- String transTableName = generateUniqueName();
- String fullTableName = INDEX_DATA_SCHEMA + QueryConstants.NAME_SEPARATOR + transTableName;
- String selectSQL = "SELECT * FROM " + fullTableName;
- try (Connection conn = DriverManager.getConnection(getUrl());
- Connection conn1 = DriverManager.getConnection(getUrl());
- Connection conn2 = DriverManager.getConnection(getUrl())) {
- createTransactionalTable(conn, fullTableName);
- conn1.setAutoCommit(false);
- conn2.setAutoCommit(true);
- ResultSet rs = conn1.createStatement().executeQuery(selectSQL);
- assertFalse(rs.next());
-
- String upsert = "UPSERT INTO " + fullTableName + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) VALUES(?, ?, ?, ?, ?, ?)";
- PreparedStatement stmt = conn1.prepareStatement(upsert);
- // upsert two rows
- TestUtil.setRowKeyColumns(stmt, 1);
- stmt.execute();
- TestUtil.setRowKeyColumns(stmt, 2);
- stmt.execute();
-
- conn1.commit();
-
- rs = conn1.createStatement().executeQuery("SELECT count(*) FROM " + fullTableName);
- assertTrue(rs.next());
- assertEquals(2, rs.getInt(1));
-
- String delete = "DELETE FROM " + fullTableName + " WHERE varchar_pk = 'varchar1'";
- stmt = conn1.prepareStatement(delete);
- int count = stmt.executeUpdate();
- assertEquals(1,count);
-
- rs = conn1.createStatement().executeQuery("SELECT count(*) FROM " + fullTableName);
- assertTrue(rs.next());
- assertEquals(1, rs.getInt(1));
- assertFalse(rs.next());
-
- rs = conn2.createStatement().executeQuery("SELECT count(*) FROM " + fullTableName);
- assertTrue(rs.next());
- assertEquals(2, rs.getInt(1));
- assertFalse(rs.next());
-
- conn1.commit();
-
- rs = conn2.createStatement().executeQuery("SELECT count(*) FROM " + fullTableName);
- assertTrue(rs.next());
- assertEquals(1, rs.getInt(1));
- assertFalse(rs.next());
- }
- }
-
- @Test
- public void testExternalTxContext() throws Exception {
- ResultSet rs;
- Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
- Connection conn = DriverManager.getConnection(getUrl(), props);
- conn.setAutoCommit(false);
- String fullTableName = generateUniqueName();
- PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
-
- TransactionSystemClient txServiceClient = pconn.getQueryServices().getTransactionSystemClient();
-
- Statement stmt = conn.createStatement();
- stmt.execute("CREATE TABLE " + fullTableName + "(K VARCHAR PRIMARY KEY, V1 VARCHAR, V2 VARCHAR) TRANSACTIONAL=true");
- HTableInterface htable = pconn.getQueryServices().getTable(Bytes.toBytes(fullTableName));
- stmt.executeUpdate("upsert into " + fullTableName + " values('x', 'a', 'a')");
- conn.commit();
-
- try (Connection newConn = DriverManager.getConnection(getUrl(), props)) {
- rs = newConn.createStatement().executeQuery("select count(*) from " + fullTableName);
- assertTrue(rs.next());
- assertEquals(1,rs.getInt(1));
- }
-
- // Use HBase level Tephra APIs to start a new transaction
- TransactionAwareHTable txAware = new TransactionAwareHTable(htable, TxConstants.ConflictDetection.ROW);
- TransactionContext txContext = new TransactionContext(txServiceClient, txAware);
- txContext.start();
-
- // Use HBase APIs to add a new row
- Put put = new Put(Bytes.toBytes("z"));
- put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
- put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes("V1"), Bytes.toBytes("b"));
- txAware.put(put);
-
- // Use Phoenix APIs to add new row (sharing the transaction context)
- pconn.setTransactionContext(txContext);
- conn.createStatement().executeUpdate("upsert into " + fullTableName + " values('y', 'c', 'c')");
-
- // New connection should not see data as it hasn't been committed yet
- try (Connection newConn = DriverManager.getConnection(getUrl(), props)) {
- rs = newConn.createStatement().executeQuery("select count(*) from " + fullTableName);
- assertTrue(rs.next());
- assertEquals(1,rs.getInt(1));
- }
-
- // Use new connection to create a row with a conflict
- Connection connWithConflict = DriverManager.getConnection(getUrl(), props);
- connWithConflict.createStatement().execute("upsert into " + fullTableName + " values('z', 'd', 'd')");
-
- // Existing connection should see data even though it hasn't been committed yet
- rs = conn.createStatement().executeQuery("select count(*) from " + fullTableName);
- assertTrue(rs.next());
- assertEquals(3,rs.getInt(1));
-
- // Use Tephra APIs directly to finish (i.e. commit) the transaction
- txContext.finish();
-
- // Confirm that attempt to commit row with conflict fails
- try {
- connWithConflict.commit();
- fail();
- } catch (SQLException e) {
- assertEquals(SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION.getErrorCode(), e.getErrorCode());
- } finally {
- connWithConflict.close();
- }
-
- // New connection should now see data as it has been committed
- try (Connection newConn = DriverManager.getConnection(getUrl(), props)) {
- rs = newConn.createStatement().executeQuery("select count(*) from " + fullTableName);
- assertTrue(rs.next());
- assertEquals(3,rs.getInt(1));
- }
-
- // Repeat the same as above, but this time abort the transaction
- txContext = new TransactionContext(txServiceClient, txAware);
- txContext.start();
-
- // Use HBase APIs to add a new row
- put = new Put(Bytes.toBytes("j"));
- put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES);
- put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes("V1"), Bytes.toBytes("e"));
- txAware.put(put);
-
- // Use Phoenix APIs to add new row (sharing the transaction context)
- pconn.setTransactionContext(txContext);
- conn.createStatement().executeUpdate("upsert into " + fullTableName + " values('k', 'f', 'f')");
-
- // Existing connection should see data even though it hasn't been committed yet
- rs = conn.createStatement().executeQuery("select count(*) from " + fullTableName);
- assertTrue(rs.next());
- assertEquals(5,rs.getInt(1));
-
- connWithConflict.createStatement().execute("upsert into " + fullTableName + " values('k', 'g', 'g')");
- rs = connWithConflict.createStatement().executeQuery("select count(*) from " + fullTableName);
- assertTrue(rs.next());
- assertEquals(4,rs.getInt(1));
-
- // Use Tephra APIs directly to abort (i.e. rollback) the transaction
- txContext.abort();
-
- rs = conn.createStatement().executeQuery("select count(*) from " + fullTableName);
- assertTrue(rs.next());
- assertEquals(3,rs.getInt(1));
-
- // Should succeed since conflicting row was aborted
- connWithConflict.commit();
-
- // New connection should now see data as it has been committed
- try (Connection newConn = DriverManager.getConnection(getUrl(), props)) {
- rs = newConn.createStatement().executeQuery("select count(*) from " + fullTableName);
- assertTrue(rs.next());
- assertEquals(4,rs.getInt(1));
- }
-
- // Even using HBase APIs directly, we shouldn't find 'j' since a delete marker would have been
- // written to hide it.
- Result result = htable.get(new Get(Bytes.toBytes("j")));
- assertTrue(result.isEmpty());
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cae4a7c4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index dac2b4d..2da2077 100644
--- a/pom.xml
+++ b/pom.xml
@@ -234,7 +234,7 @@
<runOrder>alphabetical</runOrder>
<reuseForks>true</reuseForks>
<runOrder>alphabetical</runOrder>
- <parallel>classesAndMethods</parallel>
+ <parallel>methods</parallel>
<threadCount>20</threadCount>
<argLine>-Xmx2000m -XX:MaxPermSize=256m -Djava.security.egd=file:/dev/./urandom "-Djava.library.path=${hadoop.library.path}${path.separator}${java.library.path}" -XX:NewRatio=4 -XX:SurvivorRatio=8 -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+DisableExplicitGC -XX:+UseCMSInitiatingOccupancyOnly -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:CMSInitiatingOccupancyFraction=68</argLine>
<redirectTestOutputToFile>${test.output.tofile}</redirectTestOutputToFile>
@@ -261,7 +261,7 @@
<runOrder>alphabetical</runOrder>
<reuseForks>true</reuseForks>
<runOrder>alphabetical</runOrder>
- <parallel>classesAndMethods</parallel>
+ <parallel>methods</parallel>
<threadCount>20</threadCount>
<!-- We're intermittantly hitting this assertion:
Caused by: java.lang.AssertionError: we should never remove a different context