You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/08/24 19:49:18 UTC

phoenix git commit: PHOENIX-2202 Implement PhoenixUncollect

Repository: phoenix
Updated Branches:
  refs/heads/calcite bb519e51c -> 1f5ee4fcc


PHOENIX-2202 Implement PhoenixUncollect


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1f5ee4fc
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1f5ee4fc
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1f5ee4fc

Branch: refs/heads/calcite
Commit: 1f5ee4fcc103e9a247a422e4704eda89e3132342
Parents: bb519e5
Author: maryannxue <we...@intel.com>
Authored: Mon Aug 24 13:49:04 2015 -0400
Committer: maryannxue <we...@intel.com>
Committed: Mon Aug 24 13:49:04 2015 -0400

----------------------------------------------------------------------
 .../org/apache/phoenix/calcite/CalciteIT.java   | 46 +++++++++++++++
 .../apache/phoenix/calcite/PhoenixTable.java    | 10 +++-
 .../phoenix/calcite/rel/PhoenixUncollect.java   | 59 ++++++++++++++++++++
 .../calcite/rules/PhoenixConverterRules.java    | 26 +++++++++
 .../apache/phoenix/execute/UnnestArrayPlan.java | 10 ++++
 5 files changed, 148 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f5ee4fc/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
index 4f22873..60fa6ea 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteIT.java
@@ -289,6 +289,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
         ensureTableCreated(url, ATABLE_NAME);
         initATableValues(getOrganizationId(), null, url);
         initJoinTableValues(url, null, null);
+        initArrayTable();
         createIndices(
                 "CREATE INDEX IDX1 ON aTable (a_string) INCLUDE (b_string, x_integer)",
                 "CREATE INDEX IDX2 ON aTable (b_string) INCLUDE (a_string, y_integer)",
@@ -300,6 +301,7 @@ public class CalciteIT extends BaseClientManagedTimeIT {
         connection.createStatement().execute("UPDATE STATISTICS " + JOIN_ITEM_TABLE_FULL_NAME);
         connection.createStatement().execute("UPDATE STATISTICS " + JOIN_SUPPLIER_TABLE_FULL_NAME);
         connection.createStatement().execute("UPDATE STATISTICS " + JOIN_ORDER_TABLE_FULL_NAME);
+        connection.createStatement().execute("UPDATE STATISTICS " + SCORES_TABLE_NAME);
         connection.createStatement().execute("UPDATE STATISTICS IDX1");
         connection.createStatement().execute("UPDATE STATISTICS IDX2");
         connection.createStatement().execute("UPDATE STATISTICS IDX_FULL");
@@ -318,6 +320,33 @@ public class CalciteIT extends BaseClientManagedTimeIT {
         conn.close();        
     }
     
+    protected static final String SCORES_TABLE_NAME = "scores";
+    
+    protected void initArrayTable() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            conn.createStatement().execute(
+                    "CREATE TABLE " + SCORES_TABLE_NAME
+                    + "(student_id INTEGER PRIMARY KEY, scores INTEGER[])");
+            PreparedStatement stmt = conn.prepareStatement(
+                    "UPSERT INTO " + SCORES_TABLE_NAME
+                    + " VALUES(?, ?)");
+            stmt.setInt(1, 1);
+            stmt.setArray(2, conn.createArrayOf("INTEGER", new Integer[] {85, 80, 82}));
+            stmt.execute();
+            stmt.setInt(1, 2);
+            stmt.setArray(2, null);
+            stmt.execute();
+            stmt.setInt(1, 3);
+            stmt.setArray(2, conn.createArrayOf("INTEGER", new Integer[] {87, 88, 80}));
+            stmt.execute();
+            conn.commit();
+        } catch (TableAlreadyExistsException e) {
+        }
+        conn.close();        
+    }
+    
     @Test public void testTableScan() throws Exception {
         start().sql("select * from aTable where a_string = 'a'")
                 .explainIs("PhoenixToEnumerableConverter\n" +
@@ -1006,6 +1035,23 @@ public class CalciteIT extends BaseClientManagedTimeIT {
                 .close();
     }
     
+    @Test public void testUnnest() {
+        start().sql("SELECT t.s FROM UNNEST((SELECT scores FROM " + SCORES_TABLE_NAME + ")) AS t(s)")
+                .explainIs("PhoenixToEnumerableConverter\n" +
+                           "  PhoenixUncollect\n" +
+                           "    PhoenixToClientConverter\n" +
+                           "      PhoenixServerProject(EXPR$0=[$1])\n" +
+                           "        PhoenixTableScan(table=[[phoenix, SCORES]])\n")
+                .resultIs(new Object[][] {
+                        {85}, 
+                        {80}, 
+                        {82}, 
+                        {87}, 
+                        {88}, 
+                        {80}})
+                .close();
+    }
+    
     @Test public void testSelectFromView() {
         start().sql("select * from v")
                 .explainIs("PhoenixToEnumerableConverter\n" +

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f5ee4fc/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
index 1ea0be3..cfda441 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixTable.java
@@ -67,9 +67,13 @@ public class PhoenixTable extends AbstractTable implements TranslatableTable {
     public RelDataType getRowType(RelDataTypeFactory typeFactory) {
         final RelDataTypeFactory.FieldInfoBuilder builder = typeFactory.builder();
         for (PColumn pColumn : pTable.getColumns()) {
-            final int sqlTypeId = pColumn.getDataType().getSqlType();
+            final PDataType baseType = 
+                    pColumn.getDataType().isArrayType() ?
+                            PDataType.fromTypeId(pColumn.getDataType().getSqlType() - PDataType.ARRAY_TYPE_BASE) 
+                          : pColumn.getDataType();
+            final int sqlTypeId = baseType.getResultSetSqlType();
             final PDataType pDataType = PDataType.fromTypeId(sqlTypeId);
-            final SqlTypeName sqlTypeName1 = SqlTypeName.valueOf(pDataType.isArrayType() ? PDataType.fromTypeId(pDataType.getSqlType() - PDataType.ARRAY_TYPE_BASE).getSqlTypeName() : pDataType.getSqlTypeName());
+            final SqlTypeName sqlTypeName1 = SqlTypeName.valueOf(pDataType.getSqlTypeName());
             final Integer maxLength = pColumn.getMaxLength();
             final Integer scale = pColumn.getScale();
             RelDataType type;
@@ -80,7 +84,7 @@ public class PhoenixTable extends AbstractTable implements TranslatableTable {
             } else {
                 type = typeFactory.createSqlType(sqlTypeName1);
             }
-            if (pDataType.isArrayType()) {
+            if (pColumn.getDataType().isArrayType()) {
                 final Integer arraySize = pColumn.getArraySize();
                 type = typeFactory.createArrayType(type, arraySize == null ? -1 : arraySize);
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f5ee4fc/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java
new file mode 100644
index 0000000..aa53ae4
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java
@@ -0,0 +1,59 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.sql.SQLException;
+import java.util.Arrays;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Uncollect;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.execute.UnnestArrayPlan;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.schema.types.PDataType;
+
+public class PhoenixUncollect extends Uncollect implements PhoenixRel {
+    
+    public static PhoenixUncollect create(RelNode input) {
+        RelOptCluster cluster = input.getCluster();
+        RelTraitSet traits = cluster.traitSetOf(PhoenixRel.CLIENT_CONVENTION);
+        return new PhoenixUncollect(cluster, traits, input);
+    }
+
+    private PhoenixUncollect(RelOptCluster cluster, RelTraitSet traitSet,
+            RelNode child) {
+        super(cluster, traitSet, child);
+    }
+
+    @Override
+    public PhoenixUncollect copy(RelTraitSet traitSet,
+        RelNode newInput) {
+        return create(newInput);
+    }
+
+    @Override
+    public RelOptCost computeSelfCost(RelOptPlanner planner) {
+        if (getInput().getConvention() != PhoenixRel.CLIENT_CONVENTION)
+            return planner.getCostFactory().makeInfiniteCost();
+        
+        return super.computeSelfCost(planner).multiplyBy(PHOENIX_FACTOR);
+    }
+    
+    @Override
+    public QueryPlan implement(Implementor implementor) {
+        QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+        Expression arrayExpression = implementor.newColumnExpression(0);
+        @SuppressWarnings("rawtypes")
+        PDataType baseType = PDataType.fromTypeId(arrayExpression.getDataType().getSqlType() - PDataType.ARRAY_TYPE_BASE);
+        try {
+            implementor.project(Arrays.<Expression> asList(LiteralExpression.newConstant(null, baseType)));
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+        return new UnnestArrayPlan(plan, arrayExpression, false);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f5ee4fc/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
index 4179e0a..210306d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
@@ -24,6 +24,7 @@ import org.apache.calcite.rel.core.JoinInfo;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.Sort;
+import org.apache.calcite.rel.core.Uncollect;
 import org.apache.calcite.rel.core.Union;
 import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalFilter;
@@ -49,6 +50,7 @@ import org.apache.phoenix.calcite.rel.PhoenixServerProject;
 import org.apache.phoenix.calcite.rel.PhoenixServerSort;
 import org.apache.phoenix.calcite.rel.PhoenixToClientConverter;
 import org.apache.phoenix.calcite.rel.PhoenixToEnumerableConverter;
+import org.apache.phoenix.calcite.rel.PhoenixUncollect;
 import org.apache.phoenix.calcite.rel.PhoenixUnion;
 import org.apache.phoenix.calcite.rel.PhoenixValues;
 
@@ -84,6 +86,7 @@ public class PhoenixConverterRules {
         PhoenixClientJoinRule.INSTANCE,
         PhoenixServerJoinRule.INSTANCE,
         PhoenixValuesRule.INSTANCE,
+        PhoenixUncollectRule.INSTANCE,
     };
 
     public static final RelOptRule[] CONVERTIBLE_RULES = {
@@ -104,6 +107,7 @@ public class PhoenixConverterRules {
         PhoenixClientJoinRule.CONVERTIBLE,
         PhoenixServerJoinRule.CONVERTIBLE,
         PhoenixValuesRule.INSTANCE,
+        PhoenixUncollectRule.INSTANCE,
     };
 
     /** Base class for planner rules that convert a relational expression to
@@ -598,6 +602,28 @@ public class PhoenixConverterRules {
     }
 
     /**
+     * Rule to convert a {@link org.apache.calcite.rel.core.Uncollect} to a
+     * {@link PhoenixUncollect}.
+     */
+    public static class PhoenixUncollectRule extends PhoenixConverterRule {
+        
+        private static final PhoenixUncollectRule INSTANCE = new PhoenixUncollectRule();
+
+        private PhoenixUncollectRule() {
+            super(Uncollect.class, Convention.NONE, 
+                    PhoenixRel.CLIENT_CONVENTION, "PhoenixUncollectRule");
+        }
+
+        public RelNode convert(RelNode rel) {
+            final Uncollect uncollect = (Uncollect) rel;
+            return PhoenixUncollect.create(
+                convert(
+                        uncollect.getInput(), 
+                        uncollect.getInput().getTraitSet().replace(out)));
+        }
+    }
+
+    /**
      * Rule to convert an {@link org.apache.calcite.rel.logical.LogicalIntersect}
      * to an {@link PhoenixIntersectRel}.
      o/

http://git-wip-us.apache.org/repos/asf/phoenix/blob/1f5ee4fc/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
index c4a6b20..125baf3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.expression.BaseSingleExpression;
 import org.apache.phoenix.expression.BaseTerminalExpression;
@@ -180,4 +181,13 @@ public class UnnestArrayPlan extends DelegateQueryPlan {
             return PInteger.INSTANCE;
         }
     }
+
+    @Override
+    public QueryPlan limit(Integer limit) {
+        if (limit == null)
+            return this;
+        
+        return new ClientScanPlan(this.getContext(), this.getStatement(), this.getTableRef(),
+                this.getProjector(), limit, null, OrderBy.EMPTY_ORDER_BY, this);
+    }
 }