You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/12/11 03:44:15 UTC
[37/52] [abbrv] phoenix git commit: PHOENIX-2480 SQL Query with
multiple projection selections over multiple tables having LEFT OUTER JOINS
returns completely null for random columns even when data is present
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bc9974ba/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java
new file mode 100644
index 0000000..d9016d0
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinMoreIT.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class SortMergeJoinMoreIT extends BaseHBaseManagedTimeIT {
+
+ @BeforeClass
+ @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
+ public static void doSetup() throws Exception {
+ Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+ // Forces server cache to be used
+ props.put(QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, Integer.toString(2));
+ // Must update config before starting server
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Test
+ public void testJoinOverSaltedTables() throws Exception {
+ String tempTableNoSalting = "TEMP_TABLE_NO_SALTING";
+ String tempTableWithSalting = "TEMP_TABLE_WITH_SALTING";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ try {
+ conn.createStatement().execute("CREATE TABLE " + tempTableNoSalting
+ + " (mypk INTEGER NOT NULL PRIMARY KEY, "
+ + " col1 INTEGER)");
+ conn.createStatement().execute("CREATE TABLE " + tempTableWithSalting
+ + " (mypk INTEGER NOT NULL PRIMARY KEY, "
+ + " col1 INTEGER) SALT_BUCKETS=4");
+
+ PreparedStatement upsertStmt = conn.prepareStatement(
+ "upsert into " + tempTableNoSalting + "(mypk, col1) " + "values (?, ?)");
+ for (int i = 0; i < 3; i++) {
+ upsertStmt.setInt(1, i + 1);
+ upsertStmt.setInt(2, 3 - i);
+ upsertStmt.execute();
+ }
+ conn.commit();
+
+ upsertStmt = conn.prepareStatement(
+ "upsert into " + tempTableWithSalting + "(mypk, col1) " + "values (?, ?)");
+ for (int i = 0; i < 6; i++) {
+ upsertStmt.setInt(1, i + 1);
+ upsertStmt.setInt(2, 3 - (i % 3));
+ upsertStmt.execute();
+ }
+ conn.commit();
+
+ // LHS=unsalted JOIN RHS=salted
+ String query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ lhs.mypk, lhs.col1, rhs.mypk, rhs.col1 FROM "
+ + tempTableNoSalting + " lhs JOIN "
+ + tempTableWithSalting + " rhs ON rhs.mypk = lhs.col1 ORDER BY lhs.mypk";
+ PreparedStatement statement = conn.prepareStatement(query);
+ ResultSet rs = statement.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 1);
+ assertEquals(rs.getInt(2), 3);
+ assertEquals(rs.getInt(3), 3);
+ assertEquals(rs.getInt(4), 1);
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 2);
+ assertEquals(rs.getInt(2), 2);
+ assertEquals(rs.getInt(3), 2);
+ assertEquals(rs.getInt(4), 2);
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 3);
+ assertEquals(rs.getInt(2), 1);
+ assertEquals(rs.getInt(3), 1);
+ assertEquals(rs.getInt(4), 3);
+
+ assertFalse(rs.next());
+
+ // LHS=salted JOIN RHS=salted
+ query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ lhs.mypk, lhs.col1, rhs.mypk, rhs.col1 FROM "
+ + tempTableWithSalting + " lhs JOIN "
+ + tempTableNoSalting + " rhs ON rhs.mypk = lhs.col1 ORDER BY lhs.mypk";
+ statement = conn.prepareStatement(query);
+ rs = statement.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 1);
+ assertEquals(rs.getInt(2), 3);
+ assertEquals(rs.getInt(3), 3);
+ assertEquals(rs.getInt(4), 1);
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 2);
+ assertEquals(rs.getInt(2), 2);
+ assertEquals(rs.getInt(3), 2);
+ assertEquals(rs.getInt(4), 2);
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 3);
+ assertEquals(rs.getInt(2), 1);
+ assertEquals(rs.getInt(3), 1);
+ assertEquals(rs.getInt(4), 3);
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 4);
+ assertEquals(rs.getInt(2), 3);
+ assertEquals(rs.getInt(3), 3);
+ assertEquals(rs.getInt(4), 1);
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 5);
+ assertEquals(rs.getInt(2), 2);
+ assertEquals(rs.getInt(3), 2);
+ assertEquals(rs.getInt(4), 2);
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 6);
+ assertEquals(rs.getInt(2), 1);
+ assertEquals(rs.getInt(3), 1);
+ assertEquals(rs.getInt(4), 3);
+
+ assertFalse(rs.next());
+
+ // LHS=salted JOIN RHS=salted
+ query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ lhs.mypk, lhs.col1, rhs.mypk, rhs.col1 FROM "
+ + tempTableWithSalting + " lhs JOIN "
+ + tempTableWithSalting + " rhs ON rhs.mypk = (lhs.col1 + 3) ORDER BY lhs.mypk";
+ statement = conn.prepareStatement(query);
+ rs = statement.executeQuery();
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 1);
+ assertEquals(rs.getInt(2), 3);
+ assertEquals(rs.getInt(3), 6);
+ assertEquals(rs.getInt(4), 1);
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 2);
+ assertEquals(rs.getInt(2), 2);
+ assertEquals(rs.getInt(3), 5);
+ assertEquals(rs.getInt(4), 2);
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 3);
+ assertEquals(rs.getInt(2), 1);
+ assertEquals(rs.getInt(3), 4);
+ assertEquals(rs.getInt(4), 3);
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 4);
+ assertEquals(rs.getInt(2), 3);
+ assertEquals(rs.getInt(3), 6);
+ assertEquals(rs.getInt(4), 1);
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 5);
+ assertEquals(rs.getInt(2), 2);
+ assertEquals(rs.getInt(3), 5);
+ assertEquals(rs.getInt(4), 2);
+ assertTrue(rs.next());
+ assertEquals(rs.getInt(1), 6);
+ assertEquals(rs.getInt(2), 1);
+ assertEquals(rs.getInt(3), 4);
+ assertEquals(rs.getInt(4), 3);
+
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
+
+ @Test
+ public void testJoinOnDynamicColumns() throws Exception {
+ String tableA = "tableA";
+ String tableB = "tableB";
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = null;
+ PreparedStatement stmt = null;
+ try {
+ conn = DriverManager.getConnection(getUrl(), props);
+ String ddlA = "CREATE TABLE " + tableA + " (pkA INTEGER NOT NULL, " + " colA1 INTEGER, "
+ + " colA2 VARCHAR " + "CONSTRAINT PK PRIMARY KEY" + "(pkA)" + ")";
+
+ String ddlB = "CREATE TABLE " + tableB + " (pkB INTEGER NOT NULL PRIMARY KEY, " + " colB INTEGER)";
+ stmt = conn.prepareStatement(ddlA);
+ stmt.execute();
+ stmt.close();
+
+ stmt = conn.prepareStatement(ddlB);
+ stmt.execute();
+ stmt.close();
+
+ String upsertA = "UPSERT INTO TABLEA (pkA, colA1, colA2) VALUES(?, ?, ?)";
+ stmt = conn.prepareStatement(upsertA);
+ int i = 0;
+ for (i = 0; i < 5; i++) {
+ stmt.setInt(1, i);
+ stmt.setInt(2, i + 10);
+ stmt.setString(3, "00" + i);
+ stmt.executeUpdate();
+ }
+ conn.commit();
+ stmt.close();
+
+ // upsert select dynamic columns in tableB
+ conn.createStatement().execute("CREATE SEQUENCE SEQB");
+ String upsertBSelectA = "UPSERT INTO TABLEB (pkB, pkA INTEGER)"
+ + "SELECT NEXT VALUE FOR SEQB, pkA FROM TABLEA";
+ stmt = conn.prepareStatement(upsertBSelectA);
+ stmt.executeUpdate();
+ stmt.close();
+ conn.commit();
+ conn.createStatement().execute("DROP SEQUENCE SEQB");
+
+ // perform a join between tableB and tableA by joining on the dynamic column that we upserted in
+ // tableB. This join should return all the rows from table A.
+ String joinSql = "SELECT /*+ USE_SORT_MERGE_JOIN*/ A.pkA, A.COLA1, A.colA2 FROM TABLEB B(pkA INTEGER) JOIN TABLEA A ON a.pkA = b.pkA";
+ stmt = conn.prepareStatement(joinSql);
+ ResultSet rs = stmt.executeQuery();
+ i = 0;
+ while (rs.next()) {
+ // check that we get back all the rows that we upserted for tableA above.
+ assertEquals(rs.getInt(1), i);
+ assertEquals(rs.getInt(2), i + 10);
+ assertEquals(rs.getString(3), "00" + i);
+ i++;
+ }
+ assertEquals(5,i);
+ } finally {
+ if (stmt != null) {
+ stmt.close();
+ }
+ if (conn != null) {
+ conn.close();
+ }
+
+ }
+
+ }
+
+ @Test
+ public void testSubqueryWithoutData() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(false);
+
+ try {
+ String GRAMMAR_TABLE = "CREATE TABLE IF NOT EXISTS GRAMMAR_TABLE (ID INTEGER PRIMARY KEY, " +
+ "unsig_id UNSIGNED_INT, big_id BIGINT, unsig_long_id UNSIGNED_LONG, tiny_id TINYINT," +
+ "unsig_tiny_id UNSIGNED_TINYINT, small_id SMALLINT, unsig_small_id UNSIGNED_SMALLINT," +
+ "float_id FLOAT, unsig_float_id UNSIGNED_FLOAT, double_id DOUBLE, unsig_double_id UNSIGNED_DOUBLE," +
+ "decimal_id DECIMAL, boolean_id BOOLEAN, time_id TIME, date_id DATE, timestamp_id TIMESTAMP," +
+ "unsig_time_id TIME, unsig_date_id DATE, unsig_timestamp_id TIMESTAMP, varchar_id VARCHAR (30)," +
+ "char_id CHAR (30), binary_id BINARY (100), varbinary_id VARBINARY (100))";
+
+ String LARGE_TABLE = "CREATE TABLE IF NOT EXISTS LARGE_TABLE (ID INTEGER PRIMARY KEY, " +
+ "unsig_id UNSIGNED_INT, big_id BIGINT, unsig_long_id UNSIGNED_LONG, tiny_id TINYINT," +
+ "unsig_tiny_id UNSIGNED_TINYINT, small_id SMALLINT, unsig_small_id UNSIGNED_SMALLINT," +
+ "float_id FLOAT, unsig_float_id UNSIGNED_FLOAT, double_id DOUBLE, unsig_double_id UNSIGNED_DOUBLE," +
+ "decimal_id DECIMAL, boolean_id BOOLEAN, time_id TIME, date_id DATE, timestamp_id TIMESTAMP," +
+ "unsig_time_id TIME, unsig_date_id DATE, unsig_timestamp_id TIMESTAMP, varchar_id VARCHAR (30)," +
+ "char_id CHAR (30), binary_id BINARY (100), varbinary_id VARBINARY (100))";
+
+ String SECONDARY_LARGE_TABLE = "CREATE TABLE IF NOT EXISTS SECONDARY_LARGE_TABLE (SEC_ID INTEGER PRIMARY KEY," +
+ "sec_unsig_id UNSIGNED_INT, sec_big_id BIGINT, sec_usnig_long_id UNSIGNED_LONG, sec_tiny_id TINYINT," +
+ "sec_unsig_tiny_id UNSIGNED_TINYINT, sec_small_id SMALLINT, sec_unsig_small_id UNSIGNED_SMALLINT," +
+ "sec_float_id FLOAT, sec_unsig_float_id UNSIGNED_FLOAT, sec_double_id DOUBLE, sec_unsig_double_id UNSIGNED_DOUBLE," +
+ "sec_decimal_id DECIMAL, sec_boolean_id BOOLEAN, sec_time_id TIME, sec_date_id DATE," +
+ "sec_timestamp_id TIMESTAMP, sec_unsig_time_id TIME, sec_unsig_date_id DATE, sec_unsig_timestamp_id TIMESTAMP," +
+ "sec_varchar_id VARCHAR (30), sec_char_id CHAR (30), sec_binary_id BINARY (100), sec_varbinary_id VARBINARY (100))";
+ createTestTable(getUrl(), GRAMMAR_TABLE);
+ createTestTable(getUrl(), LARGE_TABLE);
+ createTestTable(getUrl(), SECONDARY_LARGE_TABLE);
+
+ String ddl = "SELECT /*+USE_SORT_MERGE_JOIN*/ * FROM (SELECT ID, BIG_ID, DATE_ID FROM LARGE_TABLE AS A WHERE (A.ID % 5) = 0) AS A " +
+ "INNER JOIN (SELECT SEC_ID, SEC_TINY_ID, SEC_UNSIG_FLOAT_ID FROM SECONDARY_LARGE_TABLE AS B WHERE (B.SEC_ID % 5) = 0) AS B " +
+ "ON A.ID=B.SEC_ID WHERE A.DATE_ID > ALL (SELECT SEC_DATE_ID FROM SECONDARY_LARGE_TABLE LIMIT 100) " +
+ "AND B.SEC_UNSIG_FLOAT_ID = ANY (SELECT sec_unsig_float_id FROM SECONDARY_LARGE_TABLE " +
+ "WHERE SEC_ID > ALL (SELECT MIN (ID) FROM GRAMMAR_TABLE WHERE UNSIG_ID IS NULL) AND " +
+ "SEC_UNSIG_ID < ANY (SELECT DISTINCT(UNSIG_ID) FROM LARGE_TABLE WHERE UNSIG_ID<2500) LIMIT 1000) " +
+ "AND A.ID < 10000";
+ ResultSet rs = conn.createStatement().executeQuery(ddl);
+ assertFalse(rs.next());
+ } finally {
+ Statement statement = conn.createStatement();
+ String query = "drop table GRAMMAR_TABLE";
+ statement.executeUpdate(query);
+ query = "drop table LARGE_TABLE";
+ statement.executeUpdate(query);
+ query = "drop table SECONDARY_LARGE_TABLE";
+ statement.executeUpdate(query);
+ conn.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/bc9974ba/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 8a9abc2..ad65c1c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -285,23 +285,28 @@ public class QueryCompiler {
JoinType[] joinTypes = new JoinType[count];
PTable[] tables = new PTable[count];
int[] fieldPositions = new int[count];
- HashSubPlan[] subPlans = new HashSubPlan[count];
+ StatementContext[] subContexts = new StatementContext[count];
+ QueryPlan[] subPlans = new QueryPlan[count];
+ HashSubPlan[] hashPlans = new HashSubPlan[count];
fieldPositions[0] = projectedTable.getColumns().size() - projectedTable.getPKColumns().size();
for (int i = 0; i < count; i++) {
JoinSpec joinSpec = joinSpecs.get(i);
Scan subScan = ScanUtil.newScan(originalScan);
- StatementContext subContext = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
- QueryPlan joinPlan = compileJoinQuery(subContext, binds, joinSpec.getJoinTable(), true, true, null);
+ subContexts[i] = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
+ subPlans[i] = compileJoinQuery(subContexts[i], binds, joinSpec.getJoinTable(), true, true, null);
boolean hasPostReference = joinSpec.getJoinTable().hasPostReference();
if (hasPostReference) {
- tables[i] = subContext.getResolver().getTables().get(0).getTable();
+ tables[i] = subContexts[i].getResolver().getTables().get(0).getTable();
projectedTable = JoinCompiler.joinProjectedTables(projectedTable, tables[i], joinSpec.getType());
} else {
tables[i] = null;
}
+ }
+ for (int i = 0; i < count; i++) {
+ JoinSpec joinSpec = joinSpecs.get(i);
context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), query.getUdfParseNodes()));
joinIds[i] = new ImmutableBytesPtr(emptyByteArray); // place-holder
- Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, subContext, true);
+ Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, subContexts[i], true);
joinExpressions[i] = joinConditions.getFirst();
List<Expression> hashExpressions = joinConditions.getSecond();
Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
@@ -312,7 +317,7 @@ public class QueryCompiler {
if (i < count - 1) {
fieldPositions[i + 1] = fieldPositions[i] + (tables[i] == null ? 0 : (tables[i].getColumns().size() - tables[i].getPKColumns().size()));
}
- subPlans[i] = new HashSubPlan(i, joinPlan, optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression);
+ hashPlans[i] = new HashSubPlan(i, subPlans[i], optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression);
}
TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
QueryPlan plan = compileSingleFlatQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin(), null, !table.isSubselect() && projectPKColumns ? tupleProjector : null, true);
@@ -322,7 +327,7 @@ public class QueryCompiler {
limit = plan.getLimit();
}
HashJoinInfo joinInfo = new HashJoinInfo(projectedTable, joinIds, joinExpressions, joinTypes, starJoinVector, tables, fieldPositions, postJoinFilterExpression, limit);
- return HashJoinPlan.create(joinTable.getStatement(), plan, joinInfo, subPlans);
+ return HashJoinPlan.create(joinTable.getStatement(), plan, joinInfo, hashPlans);
}
JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1);