You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2014/01/29 01:23:04 UTC
git commit: Fix UPSERT SELECT does not work with JOIN (github #596)
Updated Branches:
refs/heads/master 92623a0ff -> be8c2fb50
Fix UPSERT SELECT does not work with JOIN (github #596)
Project: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/commit/be8c2fb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/tree/be8c2fb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/diff/be8c2fb5
Branch: refs/heads/master
Commit: be8c2fb500b017f4b8300f8afd2152aea8a70905
Parents: 92623a0
Author: maryannxue <ma...@apache.org>
Authored: Tue Jan 28 19:22:32 2014 -0500
Committer: maryannxue <ma...@apache.org>
Committed: Tue Jan 28 19:22:32 2014 -0500
----------------------------------------------------------------------
.../apache/phoenix/compile/FromCompiler.java | 11 +--
.../apache/phoenix/compile/QueryCompiler.java | 18 ++--
.../apache/phoenix/compile/UpsertCompiler.java | 6 +-
.../apache/phoenix/end2end/HashJoinTest.java | 88 ++++++++++++++++++++
4 files changed, 103 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/be8c2fb5/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index 92d8995..ec46560 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -141,14 +141,9 @@ public class FromCompiler {
public static ColumnResolver getResolver(SelectStatement statement, PhoenixConnection connection)
throws SQLException {
List<TableNode> fromNodes = statement.getFrom();
- if (fromNodes.size() > 1) { throw new SQLFeatureNotSupportedException("Joins not supported"); }
- SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, (NamedTableNode)fromNodes.get(0), false, false);
- return visitor;
- }
-
- public static ColumnResolver getMultiTableResolver(SelectStatement statement, PhoenixConnection connection)
- throws SQLException {
- List<TableNode> fromNodes = statement.getFrom();
+ if (fromNodes.size() == 1)
+ return new SingleTableColumnResolver(connection, (NamedTableNode)fromNodes.get(0), false, false);
+
MultiTableColumnResolver visitor = new MultiTableColumnResolver(connection);
for (TableNode node : fromNodes) {
node.accept(visitor);
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/be8c2fb5/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index bafacf9..af1e7dc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -116,13 +116,13 @@ public class QueryCompiler {
protected QueryPlan compile(SelectStatement select, Scan scan, boolean asSubquery) throws SQLException{
PhoenixConnection connection = statement.getConnection();
List<Object> binds = statement.getParameters();
- ColumnResolver resolver = FromCompiler.getMultiTableResolver(select, connection);
+ ColumnResolver resolver = FromCompiler.getResolver(select, connection);
// TODO: do this normalization outside of this so as it's not repeated by the optimizer
select = StatementNormalizer.normalize(select, resolver);
StatementContext context = new StatementContext(statement, resolver, binds, scan);
if (select.getFrom().size() == 1)
- return compileSingleQuery(context, select, binds);
+ return compileSingleQuery(context, select, binds, parallelIteratorFactory);
if (!asSubquery) {
SelectStatement optimized = JoinCompiler.optimize(context, select, statement);
@@ -130,7 +130,7 @@ public class QueryCompiler {
select = optimized;
// TODO: this is a relatively expensive operation that shouldn't be
// done multiple times
- resolver = FromCompiler.getMultiTableResolver(select, connection);
+ resolver = FromCompiler.getResolver(select, connection);
context.setResolver(resolver);
}
}
@@ -148,7 +148,7 @@ public class QueryCompiler {
context.setCurrentTable(join.getMainTable());
context.setResolver(JoinCompiler.getColumnResolver(projectedTable));
join.projectColumns(context.getScan(), join.getMainTable());
- return compileSingleQuery(context, select, binds);
+ return compileSingleQuery(context, select, binds, null);
}
boolean[] starJoinVector = JoinCompiler.getStarJoinVector(join);
@@ -176,7 +176,7 @@ public class QueryCompiler {
StatementContext subContext = new StatementContext(statement, resolver, binds, subScan);
subContext.setCurrentTable(joinTable.getTable());
join.projectColumns(subScan, joinTable.getTable());
- joinPlans[i] = compileSingleQuery(subContext, subStatement, binds);
+ joinPlans[i] = compileSingleQuery(subContext, subStatement, binds, null);
boolean hasPostReference = join.hasPostReference(joinTable.getTable());
if (hasPostReference) {
tables[i] = subProjTable.getTable();
@@ -198,7 +198,7 @@ public class QueryCompiler {
context.setCurrentTable(join.getMainTable());
context.setResolver(JoinCompiler.getColumnResolver(projectedTable));
join.projectColumns(context.getScan(), join.getMainTable());
- BasicQueryPlan plan = compileSingleQuery(context, JoinCompiler.getSubqueryWithoutJoin(select, join), binds);
+ BasicQueryPlan plan = compileSingleQuery(context, JoinCompiler.getSubqueryWithoutJoin(select, join), binds, parallelIteratorFactory);
Expression postJoinFilterExpression = join.compilePostFilterExpression(context);
HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, joinExpressions, joinTypes, starJoinVector, tables, fieldPositions, postJoinFilterExpression);
return new HashJoinPlan(plan, joinInfo, hashExpressions, joinPlans);
@@ -230,7 +230,7 @@ public class QueryCompiler {
context.setCurrentTable(lastJoinTable.getTable());
context.setResolver(JoinCompiler.getColumnResolver(projectedTable));
join.projectColumns(context.getScan(), lastJoinTable.getTable());
- BasicQueryPlan rhsPlan = compileSingleQuery(context, rhs, binds);
+ BasicQueryPlan rhsPlan = compileSingleQuery(context, rhs, binds, parallelIteratorFactory);
Expression postJoinFilterExpression = join.compilePostFilterExpression(context);
HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, new List[] {joinExpressions}, new JoinType[] {type == JoinType.Inner ? type : JoinType.Left}, new boolean[] {true}, new PTable[] {lhsProjTable.getTable()}, new int[] {fieldPosition}, postJoinFilterExpression);
return new HashJoinPlan(rhsPlan, joinInfo, new List[] {hashExpressions}, new QueryPlan[] {lhsPlan});
@@ -240,7 +240,7 @@ public class QueryCompiler {
throw new SQLFeatureNotSupportedException("Joins with pattern 'A right join B left join C' not supported.");
}
- protected BasicQueryPlan compileSingleQuery(StatementContext context, SelectStatement select, List<Object> binds) throws SQLException{
+ protected BasicQueryPlan compileSingleQuery(StatementContext context, SelectStatement select, List<Object> binds, ParallelIteratorFactory parallelIteratorFactory) throws SQLException{
PhoenixConnection connection = statement.getConnection();
ColumnResolver resolver = context.getResolver();
TableRef tableRef = context.getCurrentTable();
@@ -286,4 +286,4 @@ public class QueryCompiler {
return new ScanPlan(context, select, tableRef, projector, limit, orderBy, parallelIteratorFactory);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/be8c2fb5/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 7f12117..4de4934 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -332,10 +332,10 @@ public class UpsertCompiler {
SelectStatement select = upsert.getSelect();
assert(select != null);
select = addTenantAndViewConstants(table, select, tenantId, addViewColumns);
- TableRef selectTableRef = FromCompiler.getResolver(select, connection).getTables().get(0);
- sameTable = tableRef.equals(selectTableRef);
+ sameTable = select.getFrom().size() == 1
+ && tableRef.equals(FromCompiler.getResolver(select, connection).getTables().get(0));
/* We can run the upsert in a coprocessor if:
- * 1) the into table matches from table
+ * 1) from has only 1 table and the into table matches from table
* 2) the select query isn't doing aggregation
* 3) autoCommit is on
* 4) the table is not immutable, as the client is the one that figures out the additional
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/be8c2fb5/phoenix-core/src/test/java/org/apache/phoenix/end2end/HashJoinTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/end2end/HashJoinTest.java b/phoenix-core/src/test/java/org/apache/phoenix/end2end/HashJoinTest.java
index c7f95ec..7c1b50f 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/end2end/HashJoinTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/end2end/HashJoinTest.java
@@ -1435,5 +1435,93 @@ public class HashJoinTest extends BaseHBaseManagedTimeTest {
conn.close();
}
}
+
+ @Test
+ public void testUpsertWithJoin() throws Exception {
+ String tempTable = "TEMP_JOINED_TABLE";
+ String upsertQuery1 = "UPSERT INTO " + tempTable + "(order_id, item_name, supplier_name, quantity, date) "
+ + "SELECT order_id, i.name, s.name, quantity, date FROM " + JOIN_ORDER_TABLE + " o LEFT JOIN "
+ + JOIN_ITEM_TABLE + " i ON o.item_id = i.item_id LEFT JOIN "
+ + JOIN_SUPPLIER_TABLE + " s ON i.supplier_id = s.supplier_id";
+ String upsertQuery2 = "UPSERT INTO " + tempTable + "(order_id, item_name, quantity) "
+ + "SELECT 'ORDER_SUM', i.name, sum(quantity) FROM " + JOIN_ORDER_TABLE + " o LEFT JOIN "
+ + JOIN_ITEM_TABLE + " i ON o.item_id = i.item_id GROUP BY i.name ORDER BY i.name";
+ String query = "SELECT * FROM " + tempTable;
+ Properties props = new Properties(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(PHOENIX_JDBC_URL, props);
+ conn.setAutoCommit(true);
+ try {
+ conn.createStatement().execute("CREATE TABLE " + tempTable
+ + " (order_id varchar not null, "
+ + " item_name varchar not null, "
+ + " supplier_name varchar, "
+ + " quantity integer, "
+ + " date timestamp "
+ + " CONSTRAINT pk PRIMARY KEY (order_id, item_name))");
+ conn.createStatement().execute(upsertQuery1);
+ conn.createStatement().execute(upsertQuery2);
+
+ PreparedStatement statement = conn.prepareStatement(query);
+ ResultSet rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000001");
+ assertEquals(rs.getString(2), "T1");
+ assertEquals(rs.getString(3), "S1");
+ assertEquals(rs.getInt(4), 1000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000002");
+ assertEquals(rs.getString(2), "T6");
+ assertEquals(rs.getString(3), "S6");
+ assertEquals(rs.getInt(4), 2000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000003");
+ assertEquals(rs.getString(2), "T2");
+ assertEquals(rs.getString(3), "S1");
+ assertEquals(rs.getInt(4), 3000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000004");
+ assertEquals(rs.getString(2), "T6");
+ assertEquals(rs.getString(3), "S6");
+ assertEquals(rs.getInt(4), 4000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000005");
+ assertEquals(rs.getString(2), "T3");
+ assertEquals(rs.getString(3), "S2");
+ assertEquals(rs.getInt(4), 5000);
+ assertNotNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "ORDER_SUM");
+ assertEquals(rs.getString(2), "T1");
+ assertNull(rs.getString(3));
+ assertEquals(rs.getInt(4), 1000);
+ assertNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "ORDER_SUM");
+ assertEquals(rs.getString(2), "T2");
+ assertNull(rs.getString(3));
+ assertEquals(rs.getInt(4), 3000);
+ assertNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "ORDER_SUM");
+ assertEquals(rs.getString(2), "T3");
+ assertNull(rs.getString(3));
+ assertEquals(rs.getInt(4), 5000);
+ assertNull(rs.getDate(5));
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "ORDER_SUM");
+ assertEquals(rs.getString(2), "T6");
+ assertNull(rs.getString(3));
+ assertEquals(rs.getInt(4), 6000);
+ assertNull(rs.getDate(5));
+
+ assertFalse(rs.next());
+ } finally {
+ conn.close();
+ }
+ }
}