You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/05/31 18:16:28 UTC

phoenix git commit: PHOENIX-2958 Join works incorrectly with DESC columns in Calcite-Phoenix

Repository: phoenix
Updated Branches:
  refs/heads/calcite f6c370f56 -> d94c45220


PHOENIX-2958 Join works incorrectly with DESC columns in Calcite-Phoenix


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d94c4522
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d94c4522
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d94c4522

Branch: refs/heads/calcite
Commit: d94c4522059b040d9541a2da60469aa001a662f5
Parents: f6c370f
Author: maryannxue <ma...@gmail.com>
Authored: Tue May 31 14:16:21 2016 -0400
Committer: maryannxue <ma...@gmail.com>
Committed: Tue May 31 14:16:21 2016 -0400

----------------------------------------------------------------------
 .../apache/phoenix/calcite/BaseCalciteIT.java   | 116 +++++++++++++++++++
 .../org/apache/phoenix/calcite/CalciteIT.java   |  20 +++-
 .../calcite/rel/PhoenixAbstractJoin.java        |  14 ++-
 .../phoenix/calcite/rel/PhoenixServerJoin.java  |  14 ++-
 4 files changed, 157 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d94c4522/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java
index 9ec77c5..98a3b01 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java
@@ -478,6 +478,122 @@ public class BaseCalciteIT extends BaseClientManagedTimeIT {
         conn.close();        
     }
     
+    protected static final String KEY_ORDERING_TABLE_1_NAME = "key_ordering_test_table_1";
+    protected static final String KEY_ORDERING_TABLE_2_NAME = "key_ordering_test_table_2";
+    
+    protected void initKeyOrderingTable() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            conn.createStatement().execute(
+                    "CREATE TABLE " + KEY_ORDERING_TABLE_1_NAME
+                    + "(k0 BIGINT NOT NULL, k1 INTEGER NOT NULL, v0 INTEGER, v1 BIGINT CONSTRAINT pk PRIMARY KEY (k0 DESC, k1))");
+            conn.createStatement().execute(
+                    "CREATE TABLE " + KEY_ORDERING_TABLE_2_NAME
+                    + "(k0 BIGINT NOT NULL, k1 BIGINT NOT NULL, v0 INTEGER, v1 BIGINT CONSTRAINT pk PRIMARY KEY (k0 DESC, k1 DESC))");
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + KEY_ORDERING_TABLE_1_NAME
+                    + " VALUES(?, ?, ?, ?)");
+            stmt.setInt(1, 1);
+            stmt.setInt(2, 2);
+            stmt.setInt(3, 1);
+            stmt.setInt(4, 2);
+            stmt.execute();
+            stmt.setInt(1, 1);
+            stmt.setInt(2, 3);
+            stmt.setInt(3, 1);
+            stmt.setInt(4, 3);
+            stmt.execute();
+            stmt.setInt(1, 1);
+            stmt.setInt(2, 4);
+            stmt.setInt(3, 1);
+            stmt.setInt(4, 4);
+            stmt.execute();
+            stmt.setInt(1, 1);
+            stmt.setInt(2, 5);
+            stmt.setInt(3, 1);
+            stmt.setInt(4, 5);
+            stmt.execute();
+            stmt.setInt(1, 2);
+            stmt.setInt(2, 3);
+            stmt.setInt(3, 2);
+            stmt.setInt(4, 3);
+            stmt.execute();
+            stmt.setInt(1, 2);
+            stmt.setInt(2, 5);
+            stmt.setInt(3, 2);
+            stmt.setInt(4, 5);
+            stmt.execute();
+            stmt.setInt(1, 3);
+            stmt.setInt(2, 2);
+            stmt.setInt(3, 3);
+            stmt.setInt(4, 2);
+            stmt.execute();
+            stmt.setInt(1, 5);
+            stmt.setInt(2, 2);
+            stmt.setInt(3, 5);
+            stmt.setInt(4, 2);
+            stmt.execute();
+            stmt.setInt(1, 5);
+            stmt.setInt(2, 5);
+            stmt.setInt(3, 5);
+            stmt.setInt(4, 5);
+            stmt.execute();
+            conn.commit();
+            stmt = conn.prepareStatement(
+                    "UPSERT INTO " + KEY_ORDERING_TABLE_2_NAME
+                    + " VALUES(?, ?, ?, ?)");
+            stmt.setInt(1, 1);
+            stmt.setInt(2, 2);
+            stmt.setInt(3, 1);
+            stmt.setInt(4, 2);
+            stmt.execute();
+            stmt.setInt(1, 1);
+            stmt.setInt(2, 5);
+            stmt.setInt(3, 1);
+            stmt.setInt(4, 5);
+            stmt.execute();
+            stmt.setInt(1, 2);
+            stmt.setInt(2, 2);
+            stmt.setInt(3, 2);
+            stmt.setInt(4, 2);
+            stmt.execute();
+            stmt.setInt(1, 2);
+            stmt.setInt(2, 3);
+            stmt.setInt(3, 2);
+            stmt.setInt(4, 3);
+            stmt.execute();
+            stmt.setInt(1, 2);
+            stmt.setInt(2, 4);
+            stmt.setInt(3, 2);
+            stmt.setInt(4, 4);
+            stmt.execute();
+            stmt.setInt(1, 2);
+            stmt.setInt(2, 5);
+            stmt.setInt(3, 2);
+            stmt.setInt(4, 5);
+            stmt.execute();
+            stmt.setInt(1, 4);
+            stmt.setInt(2, 3);
+            stmt.setInt(3, 4);
+            stmt.setInt(4, 3);
+            stmt.execute();
+            stmt.setInt(1, 5);
+            stmt.setInt(2, 4);
+            stmt.setInt(3, 5);
+            stmt.setInt(4, 4);
+            stmt.execute();
+            stmt.setInt(1, 5);
+            stmt.setInt(2, 5);
+            stmt.setInt(3, 5);
+            stmt.setInt(4, 5);
+            stmt.execute();
+            conn.commit();
+        } catch (TableAlreadyExistsException e) {
+        }
+        conn.close();        
+    }
+    
     protected static final String MULTI_TENANT_TABLE = "multitenant_test_table";
     protected static final String MULTI_TENANT_TABLE_INDEX = "idx_multitenant_test_table";
     protected static final String MULTI_TENANT_VIEW1 = "s1.multitenant_test_view1";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d94c4522/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
index 430cde3..ab43658 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
@@ -44,6 +44,7 @@ public class CalciteIT extends BaseCalciteIT {
         initJoinTableValues(url, null, null);
         initArrayTable();
         initSaltedTables(null);
+        initKeyOrderingTable();
         final Connection connection = DriverManager.getConnection(url);
         connection.createStatement().execute("CREATE VIEW IF NOT EXISTS v AS SELECT * from aTable where a_string = 'a'");
         connection.createStatement().execute("CREATE SEQUENCE IF NOT EXISTS seq0 START WITH 1 INCREMENT BY 1");
@@ -208,7 +209,7 @@ public class CalciteIT extends BaseCalciteIT {
                            "        PhoenixTableScan(table=[[phoenix, ATABLE]], scanOrder=[FORWARD])\n" +
                            "      PhoenixServerProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[$2])\n" +
                            "        PhoenixTableScan(table=[[phoenix, ATABLE]], scanOrder=[FORWARD])\n")
-                .resultIs(0, new Object[][] {
+                .resultIs(new Object[][] {
                           {"00A123122312312", "a", "00D300000000XHP"},
                           {"00A223122312312", "a", "00D300000000XHP"},
                           {"00A323122312312", "a", "00D300000000XHP"},
@@ -219,6 +220,23 @@ public class CalciteIT extends BaseCalciteIT {
                           {"00B823122312312", "b", "00D300000000XHP"},
                           {"00C923122312312", "c", "00D300000000XHP"}})
                 .close();
+        
+        start(false, 100000f).sql("select t1.k0, t1.k1, t2.k0, t2.k1 from " + KEY_ORDERING_TABLE_1_NAME + " t1 join " + KEY_ORDERING_TABLE_2_NAME + " t2 on t1.k0 = t2.k0 and t1.k1 = t2.k1")
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixClientProject(K0=[$0], K1=[$1], K00=[$3], K10=[$4])\n" +
+                           "    PhoenixClientJoin(condition=[AND(=($0, $3), =($2, $4))], joinType=[inner])\n" +
+                           "      PhoenixServerSort(sort0=[$0], sort1=[$2], dir0=[ASC], dir1=[ASC])\n" +
+                           "        PhoenixServerProject(K0=[$0], K1=[$1], K14=[CAST($1):BIGINT NOT NULL])\n" +
+                           "          PhoenixTableScan(table=[[phoenix, KEY_ORDERING_TEST_TABLE_1]])\n" +
+                           "      PhoenixServerProject(K0=[$0], K1=[$1])\n" +
+                           "        PhoenixTableScan(table=[[phoenix, KEY_ORDERING_TEST_TABLE_2]], scanOrder=[REVERSE])\n")
+                .resultIs(new Object[][] {
+                           {1L, 2, 1L, 2L},
+                           {1L, 5, 1L, 5L},
+                           {2L, 3, 2L, 3L},
+                           {2L, 5, 2L, 5L},
+                           {5L, 5, 5L, 5L}})
+                .close();
     }
     
     @Test public void testJoinPlanningWithCollation() throws Exception { 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d94c4522/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java
index 93960bc..42d9e0d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java
@@ -1,5 +1,6 @@
 package org.apache.phoenix.calcite.rel;
 
+import java.sql.SQLException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -15,8 +16,10 @@ import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.expression.CoerceExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.schema.SortOrder;
 
 /**
  * Implementation of {@link org.apache.calcite.rel.core.Join}
@@ -59,7 +62,16 @@ abstract public class PhoenixAbstractJoin extends Join implements PhoenixQueryRe
             ImmutableIntList keys = index == 0 ? joinInfo.leftKeys : joinInfo.rightKeys;
             for (Iterator<Integer> iter = keys.iterator(); iter.hasNext();) {
                 Integer i = iter.next();
-                conditionExprs.add(implementor.newColumnExpression(i));
+                Expression e = implementor.newColumnExpression(i);
+                if (e.getSortOrder() == SortOrder.DESC) {
+                    try {
+                        e = CoerceExpression.create(
+                                e, e.getDataType(), SortOrder.ASC, e.getMaxLength());
+                    } catch (SQLException ex) {
+                        throw new RuntimeException(ex);
+                    }
+                }
+                conditionExprs.add(e);
             }
             if (conditionExprs.isEmpty()) {
                 conditionExprs.add(LiteralExpression.newConstant(0));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d94c4522/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
index acb14ae..7bbf37c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
@@ -82,20 +82,24 @@ public class PhoenixServerJoin extends PhoenixAbstractJoin {
         if (joinType == JoinRelType.FULL || joinType == JoinRelType.RIGHT)
             return planner.getCostFactory().makeInfiniteCost();
         
-        //TODO return infinite cost if RHS size exceeds memory limit.
-        
         double rowCount = mq.getRowCount(this);
-
         double leftRowCount = mq.getRowCount(getLeft());
+        double rightRowCount = mq.getRowCount(getRight());
+        double rightRowSize = mq.getAverageRowSize(getRight());
+        double rightSize = rightRowCount * rightRowSize;
+        
+        //TODO refine this number and apply a estimated limit value from config as well.
+        double sizeLimit = 100000000d;
+        if (rightSize > sizeLimit)
+            return planner.getCostFactory().makeInfiniteCost();
+        
         if (Double.isInfinite(leftRowCount)) {
             rowCount = leftRowCount;
         } else {
             rowCount += leftRowCount;
-            double rightRowCount = mq.getRowCount(getRight());
             if (Double.isInfinite(rightRowCount)) {
                 rowCount = rightRowCount;
             } else {
-                double rightRowSize = mq.getAverageRowSize(getRight());
                 rowCount += (rightRowCount + rightRowCount * rightRowSize);
             }
         }