You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2014/01/29 01:23:04 UTC

git commit: Fix UPSERT SELECT does not work with JOIN (github #596)

Updated Branches:
  refs/heads/master 92623a0ff -> be8c2fb50


Fix UPSERT SELECT does not work with JOIN (github #596)


Project: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/commit/be8c2fb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/tree/be8c2fb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/diff/be8c2fb5

Branch: refs/heads/master
Commit: be8c2fb500b017f4b8300f8afd2152aea8a70905
Parents: 92623a0
Author: maryannxue <ma...@apache.org>
Authored: Tue Jan 28 19:22:32 2014 -0500
Committer: maryannxue <ma...@apache.org>
Committed: Tue Jan 28 19:22:32 2014 -0500

----------------------------------------------------------------------
 .../apache/phoenix/compile/FromCompiler.java    | 11 +--
 .../apache/phoenix/compile/QueryCompiler.java   | 18 ++--
 .../apache/phoenix/compile/UpsertCompiler.java  |  6 +-
 .../apache/phoenix/end2end/HashJoinTest.java    | 88 ++++++++++++++++++++
 4 files changed, 103 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/be8c2fb5/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index 92d8995..ec46560 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -141,14 +141,9 @@ public class FromCompiler {
     public static ColumnResolver getResolver(SelectStatement statement, PhoenixConnection connection)
     		throws SQLException {
     	List<TableNode> fromNodes = statement.getFrom();
-    	if (fromNodes.size() > 1) { throw new SQLFeatureNotSupportedException("Joins not supported"); }
-    	SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, (NamedTableNode)fromNodes.get(0), false, false);
-    	return visitor;
-    }
-    
-    public static ColumnResolver getMultiTableResolver(SelectStatement statement, PhoenixConnection connection)
-            throws SQLException {
-        List<TableNode> fromNodes = statement.getFrom();
+        if (fromNodes.size() == 1)
+            return new SingleTableColumnResolver(connection, (NamedTableNode)fromNodes.get(0), false, false);
+
         MultiTableColumnResolver visitor = new MultiTableColumnResolver(connection);
         for (TableNode node : fromNodes) {
             node.accept(visitor);

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/be8c2fb5/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index bafacf9..af1e7dc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -116,13 +116,13 @@ public class QueryCompiler {
     protected QueryPlan compile(SelectStatement select, Scan scan, boolean asSubquery) throws SQLException{        
         PhoenixConnection connection = statement.getConnection();
         List<Object> binds = statement.getParameters();
-        ColumnResolver resolver = FromCompiler.getMultiTableResolver(select, connection);
+        ColumnResolver resolver = FromCompiler.getResolver(select, connection);
         // TODO: do this normalization outside of this so as it's not repeated by the optimizer
         select = StatementNormalizer.normalize(select, resolver);
         StatementContext context = new StatementContext(statement, resolver, binds, scan);
         
         if (select.getFrom().size() == 1)
-            return compileSingleQuery(context, select, binds);
+            return compileSingleQuery(context, select, binds, parallelIteratorFactory);
         
         if (!asSubquery) {
             SelectStatement optimized = JoinCompiler.optimize(context, select, statement);
@@ -130,7 +130,7 @@ public class QueryCompiler {
                 select = optimized;
                 // TODO: this is a relatively expensive operation that shouldn't be
                 // done multiple times
-                resolver = FromCompiler.getMultiTableResolver(select, connection);
+                resolver = FromCompiler.getResolver(select, connection);
                 context.setResolver(resolver);
             }
         }
@@ -148,7 +148,7 @@ public class QueryCompiler {
             context.setCurrentTable(join.getMainTable());
             context.setResolver(JoinCompiler.getColumnResolver(projectedTable));
             join.projectColumns(context.getScan(), join.getMainTable());
-            return compileSingleQuery(context, select, binds);
+            return compileSingleQuery(context, select, binds, null);
         }
         
         boolean[] starJoinVector = JoinCompiler.getStarJoinVector(join);
@@ -176,7 +176,7 @@ public class QueryCompiler {
                 StatementContext subContext = new StatementContext(statement, resolver, binds, subScan);
                 subContext.setCurrentTable(joinTable.getTable());
                 join.projectColumns(subScan, joinTable.getTable());
-                joinPlans[i] = compileSingleQuery(subContext, subStatement, binds);
+                joinPlans[i] = compileSingleQuery(subContext, subStatement, binds, null);
                 boolean hasPostReference = join.hasPostReference(joinTable.getTable());
                 if (hasPostReference) {
                     tables[i] = subProjTable.getTable();
@@ -198,7 +198,7 @@ public class QueryCompiler {
             context.setCurrentTable(join.getMainTable());
             context.setResolver(JoinCompiler.getColumnResolver(projectedTable));
             join.projectColumns(context.getScan(), join.getMainTable());
-            BasicQueryPlan plan = compileSingleQuery(context, JoinCompiler.getSubqueryWithoutJoin(select, join), binds);
+            BasicQueryPlan plan = compileSingleQuery(context, JoinCompiler.getSubqueryWithoutJoin(select, join), binds, parallelIteratorFactory);
             Expression postJoinFilterExpression = join.compilePostFilterExpression(context);
             HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, joinExpressions, joinTypes, starJoinVector, tables, fieldPositions, postJoinFilterExpression);
             return new HashJoinPlan(plan, joinInfo, hashExpressions, joinPlans);
@@ -230,7 +230,7 @@ public class QueryCompiler {
             context.setCurrentTable(lastJoinTable.getTable());
             context.setResolver(JoinCompiler.getColumnResolver(projectedTable));
             join.projectColumns(context.getScan(), lastJoinTable.getTable());
-            BasicQueryPlan rhsPlan = compileSingleQuery(context, rhs, binds);
+            BasicQueryPlan rhsPlan = compileSingleQuery(context, rhs, binds, parallelIteratorFactory);
             Expression postJoinFilterExpression = join.compilePostFilterExpression(context);
             HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, new List[] {joinExpressions}, new JoinType[] {type == JoinType.Inner ? type : JoinType.Left}, new boolean[] {true}, new PTable[] {lhsProjTable.getTable()}, new int[] {fieldPosition}, postJoinFilterExpression);
             return new HashJoinPlan(rhsPlan, joinInfo, new List[] {hashExpressions}, new QueryPlan[] {lhsPlan});
@@ -240,7 +240,7 @@ public class QueryCompiler {
         throw new SQLFeatureNotSupportedException("Joins with pattern 'A right join B left join C' not supported.");
     }
     
-    protected BasicQueryPlan compileSingleQuery(StatementContext context, SelectStatement select, List<Object> binds) throws SQLException{
+    protected BasicQueryPlan compileSingleQuery(StatementContext context, SelectStatement select, List<Object> binds, ParallelIteratorFactory parallelIteratorFactory) throws SQLException{
         PhoenixConnection connection = statement.getConnection();
         ColumnResolver resolver = context.getResolver();
         TableRef tableRef = context.getCurrentTable();
@@ -286,4 +286,4 @@ public class QueryCompiler {
             return new ScanPlan(context, select, tableRef, projector, limit, orderBy, parallelIteratorFactory);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/be8c2fb5/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 7f12117..4de4934 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -332,10 +332,10 @@ public class UpsertCompiler {
             SelectStatement select = upsert.getSelect();
             assert(select != null);
             select = addTenantAndViewConstants(table, select, tenantId, addViewColumns);
-            TableRef selectTableRef = FromCompiler.getResolver(select, connection).getTables().get(0);
-            sameTable = tableRef.equals(selectTableRef);
+            sameTable = select.getFrom().size() == 1
+                && tableRef.equals(FromCompiler.getResolver(select, connection).getTables().get(0));
             /* We can run the upsert in a coprocessor if:
-             * 1) the into table matches from table
+             * 1) from has only 1 table and the into table matches from table
              * 2) the select query isn't doing aggregation
              * 3) autoCommit is on
              * 4) the table is not immutable, as the client is the one that figures out the additional

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/be8c2fb5/phoenix-core/src/test/java/org/apache/phoenix/end2end/HashJoinTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/end2end/HashJoinTest.java b/phoenix-core/src/test/java/org/apache/phoenix/end2end/HashJoinTest.java
index c7f95ec..7c1b50f 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/end2end/HashJoinTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/end2end/HashJoinTest.java
@@ -1435,5 +1435,93 @@ public class HashJoinTest extends BaseHBaseManagedTimeTest {
             conn.close();
         }
     }
+    
+    @Test
+    public void testUpsertWithJoin() throws Exception {
+        String tempTable = "TEMP_JOINED_TABLE";
+        String upsertQuery1 = "UPSERT INTO " + tempTable + "(order_id, item_name, supplier_name, quantity, date) " 
+            + "SELECT order_id, i.name, s.name, quantity, date FROM " + JOIN_ORDER_TABLE + " o LEFT JOIN " 
+            + JOIN_ITEM_TABLE + " i ON o.item_id = i.item_id LEFT JOIN "
+            + JOIN_SUPPLIER_TABLE + " s ON i.supplier_id = s.supplier_id";
+        String upsertQuery2 = "UPSERT INTO " + tempTable + "(order_id, item_name, quantity) " 
+            + "SELECT 'ORDER_SUM', i.name, sum(quantity) FROM " + JOIN_ORDER_TABLE + " o LEFT JOIN " 
+            + JOIN_ITEM_TABLE + " i ON o.item_id = i.item_id GROUP BY i.name ORDER BY i.name";
+        String query = "SELECT * FROM " + tempTable;
+        Properties props = new Properties(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(PHOENIX_JDBC_URL, props);
+        conn.setAutoCommit(true);
+        try {
+            conn.createStatement().execute("CREATE TABLE " + tempTable 
+                    + "   (order_id varchar not null, " 
+                    + "    item_name varchar not null, " 
+                    + "    supplier_name varchar, "
+                    + "    quantity integer, "
+                    + "    date timestamp " 
+                    + "    CONSTRAINT pk PRIMARY KEY (order_id, item_name))");
+            conn.createStatement().execute(upsertQuery1);
+            conn.createStatement().execute(upsertQuery2);
+            
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000001");
+            assertEquals(rs.getString(2), "T1");
+            assertEquals(rs.getString(3), "S1");
+            assertEquals(rs.getInt(4), 1000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000002");
+            assertEquals(rs.getString(2), "T6");
+            assertEquals(rs.getString(3), "S6");
+            assertEquals(rs.getInt(4), 2000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000003");
+            assertEquals(rs.getString(2), "T2");
+            assertEquals(rs.getString(3), "S1");
+            assertEquals(rs.getInt(4), 3000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000004");
+            assertEquals(rs.getString(2), "T6");
+            assertEquals(rs.getString(3), "S6");
+            assertEquals(rs.getInt(4), 4000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000005");
+            assertEquals(rs.getString(2), "T3");
+            assertEquals(rs.getString(3), "S2");
+            assertEquals(rs.getInt(4), 5000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "ORDER_SUM");
+            assertEquals(rs.getString(2), "T1");
+            assertNull(rs.getString(3));
+            assertEquals(rs.getInt(4), 1000);
+            assertNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "ORDER_SUM");
+            assertEquals(rs.getString(2), "T2");
+            assertNull(rs.getString(3));
+            assertEquals(rs.getInt(4), 3000);
+            assertNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "ORDER_SUM");
+            assertEquals(rs.getString(2), "T3");
+            assertNull(rs.getString(3));
+            assertEquals(rs.getInt(4), 5000);
+            assertNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "ORDER_SUM");
+            assertEquals(rs.getString(2), "T6");
+            assertNull(rs.getString(3));
+            assertEquals(rs.getInt(4), 6000);
+            assertNull(rs.getDate(5));
+
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
 
 }