You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/03/05 06:06:28 UTC

phoenix git commit: Make projection work with new Phoenix master

Repository: phoenix
Updated Branches:
  refs/heads/calcite 026f60b12 -> de6c14a14


Make projection work with new Phoenix master


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/de6c14a1
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/de6c14a1
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/de6c14a1

Branch: refs/heads/calcite
Commit: de6c14a14d7ab65f8236744199a7f870c003510f
Parents: 026f60b
Author: maryannxue <we...@intel.com>
Authored: Thu Mar 5 00:06:16 2015 -0500
Committer: maryannxue <we...@intel.com>
Committed: Thu Mar 5 00:06:16 2015 -0500

----------------------------------------------------------------------
 .../org/apache/phoenix/calcite/CalciteTest.java |  4 +-
 .../apache/phoenix/calcite/PhoenixProject.java  |  2 +-
 .../org/apache/phoenix/calcite/PhoenixRel.java  |  4 +-
 .../calcite/PhoenixRelImplementorImpl.java      | 89 ++++++++++++++++----
 .../expression/ProjectedColumnExpression.java   |  9 ++
 5 files changed, 87 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/de6c14a1/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
index 9bce0a3..6604bde 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
@@ -182,9 +182,9 @@ public class CalciteTest extends BaseClientManagedTimeIT {
             new PhoenixSchema(phoenixConnection));
         calciteConnection.setSchema("phoenix");
         final Statement statement = calciteConnection.createStatement();
-        final ResultSet resultSet = statement.executeQuery("select * from aTable where a_string = 'a'");
+        final ResultSet resultSet = statement.executeQuery("select entity_id, a_string, organization_id from aTable where a_string = 'a'");
         while (resultSet.next()) {
-            System.out.println("org_id=" + resultSet.getObject(1) + ",entity_id=" + resultSet.getObject(2) + ",a_string=" + resultSet.getObject("A_STRING"));
+            System.out.println("org_id=" + resultSet.getObject(3) + ",entity_id=" + resultSet.getObject(1) + ",a_string=" + resultSet.getObject("A_STRING"));
         }
         resultSet.close();
         statement.close();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/de6c14a1/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java
index 2e90397..8a1b6b7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java
@@ -34,7 +34,7 @@ public class PhoenixProject extends Project implements PhoenixRel {
 
     @Override
     public void implement(Implementor implementor, PhoenixConnection conn) {
+        implementor.setProjects(getProjects());
         implementor.visitInput(0, (PhoenixRel) getInput());
-        throw new UnsupportedOperationException();
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/de6c14a1/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java
index c255e90..95088a2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java
@@ -1,10 +1,11 @@
 package org.apache.phoenix.calcite;
 
+import java.util.List;
+
 import org.apache.calcite.plan.Convention;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rex.RexNode;
 import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.expression.ColumnExpression;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.schema.PTable;
@@ -33,6 +34,7 @@ public interface PhoenixRel extends RelNode {
   interface Implementor {
     void visitInput(int i, PhoenixRel input);
     ColumnExpression newColumnExpression(int index);
+    void setProjects(List<? extends RexNode> projects);
     void setContext(PhoenixConnection conn, PTable pTable, RexNode filter);
     QueryPlan makePlan();
   }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/de6c14a1/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java
index a833b6b..f672a9e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java
@@ -1,46 +1,51 @@
 package org.apache.phoenix.calcite;
 
 import java.sql.SQLException;
-import java.util.Collections;
+import java.util.List;
 
 import org.apache.calcite.rex.RexNode;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.ColumnProjector;
 import org.apache.phoenix.compile.ColumnResolver;
+import org.apache.phoenix.compile.ExpressionProjector;
 import org.apache.phoenix.compile.FromCompiler;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
-import org.apache.phoenix.compile.ProjectionCompiler;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.SequenceManager;
 import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.WhereCompiler;
 import org.apache.phoenix.compile.WhereOptimizer;
-import org.apache.phoenix.execute.AggregatePlan;
 import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.ColumnExpression;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ProjectedColumnExpression;
 import org.apache.phoenix.iterate.ParallelIteratorFactory;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.parse.ColumnDef;
-import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.NamedTableNode;
 import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.parse.TableName;
 import org.apache.phoenix.schema.ColumnRef;
-import org.apache.phoenix.schema.PDatum;
+import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnFamily;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.SchemaUtil;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 
 class PhoenixRelImplementorImpl implements PhoenixRel.Implementor {
 	private TableRef tableRef;
 	private PhoenixConnection conn;
 	private StatementContext context;
-	private RowProjector projector;
 	private SelectStatement select;
+    private List<? extends RexNode> projects;
+    private List<Expression> projectExpressions;
 	
     @Override
     public void visitInput(int i, PhoenixRel input) {
@@ -67,29 +72,79 @@ class PhoenixRelImplementorImpl implements PhoenixRel.Implementor {
 			            TableName.create(tableRef.getTable().getSchemaName().getString(), tableRef.getTable().getTableName().getString()),
 			            ImmutableList.<ColumnDef>of()), conn);
 	        this.context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt));
-	        // TODO: real projection
 	        this.select = SelectStatement.SELECT_STAR;
-	        this.projector = ProjectionCompiler.compile(context, select, GroupBy.EMPTY_GROUP_BY, Collections.<PDatum>emptyList());
 	        if (filter != null) {
 	        	Expression filterExpr = CalciteUtils.toExpression(filter, this);
 	        	filterExpr = WhereOptimizer.pushKeyExpressionsToScan(context, select, filterExpr);
 	        	WhereCompiler.setScanFilter(context, select, filterExpr, true, false);
 	        }
+            this.projectExpressions = Lists.<Expression> newArrayListWithExpectedSize(projects.size());
+            for (RexNode p : projects) {
+                this.projectExpressions.add(CalciteUtils.toExpression(p, this));
+            }
 		} catch (SQLException e) {
 			throw new RuntimeException(e);
 		}
 	}
+	
+    @Override
+    public void setProjects(List<? extends RexNode> projects) {
+        this.projects = projects;
+    }
 
 	@Override
 	public QueryPlan makePlan() {
-            try {
-		Integer limit = null;
-		OrderBy orderBy = OrderBy.EMPTY_ORDER_BY;
-		ParallelIteratorFactory iteratorFactory = null;
-                return new ScanPlan(context, select, tableRef, projector, limit, orderBy, iteratorFactory, true);
-            } catch (SQLException e) {
-                throw new RuntimeException(e);
-            }
+	    try {
+	        projectAllColumnFamilies(context.getScan());
+	        TupleProjector tupleProjector = createTupleProjector();
+	        TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
+	        Integer limit = null;
+	        OrderBy orderBy = OrderBy.EMPTY_ORDER_BY;
+	        ParallelIteratorFactory iteratorFactory = null;
+	        return new ScanPlan(context, select, tableRef, createRowProjector(tupleProjector), limit, orderBy, iteratorFactory, true);
+	    } catch (SQLException e) {
+	        throw new RuntimeException(e);
+	    }
 	}
+    
+    private TupleProjector createTupleProjector() {
+        KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
+        List<Expression> exprs = this.projectExpressions;
+        if (this.projects == null) {
+            exprs = Lists.<Expression> newArrayList();
+            for (PColumn column : tableRef.getTable().getColumns()) {
+                if (!SchemaUtil.isPKColumn(column)) {
+                    exprs.add(newColumnExpression(column.getPosition()));
+                }
+            }
+        }
+        for (Expression e : exprs) {
+            builder.addField(e);                
+        }
+        
+        return new TupleProjector(builder.build(), exprs.toArray(new Expression[exprs.size()]));
+    }
+    
+    private RowProjector createRowProjector(TupleProjector tupleProjector) {
+        List<ColumnProjector> columnProjectors = Lists.<ColumnProjector>newArrayList();
+        if (this.projects == null) {
+            for (PColumn column : tableRef.getTable().getPKColumns()) {
+                columnProjectors.add(new ExpressionProjector("dummy", "dummy", new ColumnRef(tableRef, column.getPosition()).newColumnExpression(), false));
+            }
+        }
+        
+        for (int i = 0; i < tupleProjector.getSchema().getFieldCount(); i++) {
+            columnProjectors.add(new ExpressionProjector("dummy", "dummy", new ProjectedColumnExpression(tupleProjector.getSchema(), i, "dummy"), false));
+        }
+        
+        return new RowProjector(columnProjectors, 0, false);
+    }
+    
+    private void projectAllColumnFamilies(Scan scan) {
+        scan.getFamilyMap().clear();
+        for (PColumnFamily family : tableRef.getTable().getColumnFamilies()) {
+            scan.addFamily(family.getName().getBytes());
+        }
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/de6c14a1/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
index 97d1aff..7956612 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
@@ -56,6 +56,15 @@ public class ProjectedColumnExpression extends ColumnExpression {
         this.displayName = displayName;
     }
     
+    public ProjectedColumnExpression(KeyValueSchema schema, int position, String displayName) {
+        super(schema.getField(position));
+        this.columns = Collections.emptyList();
+        this.schema = schema;
+        this.bitSet = ValueBitSet.newInstance(schema);
+        this.position = position;
+        this.displayName = displayName;
+    }
+    
     public static KeyValueSchema buildSchema(Collection<PColumn> columns) {
         KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
         for (PColumn column : columns) {