You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/03/05 06:06:28 UTC
phoenix git commit: Make projection work with new Phoenix master
Repository: phoenix
Updated Branches:
refs/heads/calcite 026f60b12 -> de6c14a14
Make projection work with new Phoenix master
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/de6c14a1
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/de6c14a1
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/de6c14a1
Branch: refs/heads/calcite
Commit: de6c14a14d7ab65f8236744199a7f870c003510f
Parents: 026f60b
Author: maryannxue <we...@intel.com>
Authored: Thu Mar 5 00:06:16 2015 -0500
Committer: maryannxue <we...@intel.com>
Committed: Thu Mar 5 00:06:16 2015 -0500
----------------------------------------------------------------------
.../org/apache/phoenix/calcite/CalciteTest.java | 4 +-
.../apache/phoenix/calcite/PhoenixProject.java | 2 +-
.../org/apache/phoenix/calcite/PhoenixRel.java | 4 +-
.../calcite/PhoenixRelImplementorImpl.java | 89 ++++++++++++++++----
.../expression/ProjectedColumnExpression.java | 9 ++
5 files changed, 87 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/de6c14a1/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
index 9bce0a3..6604bde 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteTest.java
@@ -182,9 +182,9 @@ public class CalciteTest extends BaseClientManagedTimeIT {
new PhoenixSchema(phoenixConnection));
calciteConnection.setSchema("phoenix");
final Statement statement = calciteConnection.createStatement();
- final ResultSet resultSet = statement.executeQuery("select * from aTable where a_string = 'a'");
+ final ResultSet resultSet = statement.executeQuery("select entity_id, a_string, organization_id from aTable where a_string = 'a'");
while (resultSet.next()) {
- System.out.println("org_id=" + resultSet.getObject(1) + ",entity_id=" + resultSet.getObject(2) + ",a_string=" + resultSet.getObject("A_STRING"));
+ System.out.println("org_id=" + resultSet.getObject(3) + ",entity_id=" + resultSet.getObject(1) + ",a_string=" + resultSet.getObject("A_STRING"));
}
resultSet.close();
statement.close();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/de6c14a1/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java
index 2e90397..8a1b6b7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixProject.java
@@ -34,7 +34,7 @@ public class PhoenixProject extends Project implements PhoenixRel {
@Override
public void implement(Implementor implementor, PhoenixConnection conn) {
+ implementor.setProjects(getProjects());
implementor.visitInput(0, (PhoenixRel) getInput());
- throw new UnsupportedOperationException();
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/de6c14a1/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java
index c255e90..95088a2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRel.java
@@ -1,10 +1,11 @@
package org.apache.phoenix.calcite;
+import java.util.List;
+
import org.apache.calcite.plan.Convention;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rex.RexNode;
import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.expression.ColumnExpression;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.schema.PTable;
@@ -33,6 +34,7 @@ public interface PhoenixRel extends RelNode {
interface Implementor {
void visitInput(int i, PhoenixRel input);
ColumnExpression newColumnExpression(int index);
+ void setProjects(List<? extends RexNode> projects);
void setContext(PhoenixConnection conn, PTable pTable, RexNode filter);
QueryPlan makePlan();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/de6c14a1/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java
index a833b6b..f672a9e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/PhoenixRelImplementorImpl.java
@@ -1,46 +1,51 @@
package org.apache.phoenix.calcite;
import java.sql.SQLException;
-import java.util.Collections;
+import java.util.List;
import org.apache.calcite.rex.RexNode;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.ColumnProjector;
import org.apache.phoenix.compile.ColumnResolver;
+import org.apache.phoenix.compile.ExpressionProjector;
import org.apache.phoenix.compile.FromCompiler;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
-import org.apache.phoenix.compile.ProjectionCompiler;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
import org.apache.phoenix.compile.SequenceManager;
import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.WhereCompiler;
import org.apache.phoenix.compile.WhereOptimizer;
-import org.apache.phoenix.execute.AggregatePlan;
import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.ColumnExpression;
import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ProjectedColumnExpression;
import org.apache.phoenix.iterate.ParallelIteratorFactory;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.parse.ColumnDef;
-import org.apache.phoenix.parse.FilterableStatement;
import org.apache.phoenix.parse.NamedTableNode;
import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.parse.TableName;
import org.apache.phoenix.schema.ColumnRef;
-import org.apache.phoenix.schema.PDatum;
+import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnFamily;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.SchemaUtil;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
class PhoenixRelImplementorImpl implements PhoenixRel.Implementor {
private TableRef tableRef;
private PhoenixConnection conn;
private StatementContext context;
- private RowProjector projector;
private SelectStatement select;
+ private List<? extends RexNode> projects;
+ private List<Expression> projectExpressions;
@Override
public void visitInput(int i, PhoenixRel input) {
@@ -67,29 +72,79 @@ class PhoenixRelImplementorImpl implements PhoenixRel.Implementor {
TableName.create(tableRef.getTable().getSchemaName().getString(), tableRef.getTable().getTableName().getString()),
ImmutableList.<ColumnDef>of()), conn);
this.context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt));
- // TODO: real projection
this.select = SelectStatement.SELECT_STAR;
- this.projector = ProjectionCompiler.compile(context, select, GroupBy.EMPTY_GROUP_BY, Collections.<PDatum>emptyList());
if (filter != null) {
Expression filterExpr = CalciteUtils.toExpression(filter, this);
filterExpr = WhereOptimizer.pushKeyExpressionsToScan(context, select, filterExpr);
WhereCompiler.setScanFilter(context, select, filterExpr, true, false);
}
+ this.projectExpressions = Lists.<Expression> newArrayListWithExpectedSize(projects.size());
+ for (RexNode p : projects) {
+ this.projectExpressions.add(CalciteUtils.toExpression(p, this));
+ }
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
+
+ @Override
+ public void setProjects(List<? extends RexNode> projects) {
+ this.projects = projects;
+ }
@Override
public QueryPlan makePlan() {
- try {
- Integer limit = null;
- OrderBy orderBy = OrderBy.EMPTY_ORDER_BY;
- ParallelIteratorFactory iteratorFactory = null;
- return new ScanPlan(context, select, tableRef, projector, limit, orderBy, iteratorFactory, true);
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
+ try {
+ projectAllColumnFamilies(context.getScan());
+ TupleProjector tupleProjector = createTupleProjector();
+ TupleProjector.serializeProjectorIntoScan(context.getScan(), tupleProjector);
+ Integer limit = null;
+ OrderBy orderBy = OrderBy.EMPTY_ORDER_BY;
+ ParallelIteratorFactory iteratorFactory = null;
+ return new ScanPlan(context, select, tableRef, createRowProjector(tupleProjector), limit, orderBy, iteratorFactory, true);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
}
+
+ private TupleProjector createTupleProjector() {
+ KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
+ List<Expression> exprs = this.projectExpressions;
+ if (this.projects == null) {
+ exprs = Lists.<Expression> newArrayList();
+ for (PColumn column : tableRef.getTable().getColumns()) {
+ if (!SchemaUtil.isPKColumn(column)) {
+ exprs.add(newColumnExpression(column.getPosition()));
+ }
+ }
+ }
+ for (Expression e : exprs) {
+ builder.addField(e);
+ }
+
+ return new TupleProjector(builder.build(), exprs.toArray(new Expression[exprs.size()]));
+ }
+
+ private RowProjector createRowProjector(TupleProjector tupleProjector) {
+ List<ColumnProjector> columnProjectors = Lists.<ColumnProjector>newArrayList();
+ if (this.projects == null) {
+ for (PColumn column : tableRef.getTable().getPKColumns()) {
+ columnProjectors.add(new ExpressionProjector("dummy", "dummy", new ColumnRef(tableRef, column.getPosition()).newColumnExpression(), false));
+ }
+ }
+
+ for (int i = 0; i < tupleProjector.getSchema().getFieldCount(); i++) {
+ columnProjectors.add(new ExpressionProjector("dummy", "dummy", new ProjectedColumnExpression(tupleProjector.getSchema(), i, "dummy"), false));
+ }
+
+ return new RowProjector(columnProjectors, 0, false);
+ }
+
+ private void projectAllColumnFamilies(Scan scan) {
+ scan.getFamilyMap().clear();
+ for (PColumnFamily family : tableRef.getTable().getColumnFamilies()) {
+ scan.addFamily(family.getName().getBytes());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/de6c14a1/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
index 97d1aff..7956612 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
@@ -56,6 +56,15 @@ public class ProjectedColumnExpression extends ColumnExpression {
this.displayName = displayName;
}
+ public ProjectedColumnExpression(KeyValueSchema schema, int position, String displayName) {
+ super(schema.getField(position));
+ this.columns = Collections.emptyList();
+ this.schema = schema;
+ this.bitSet = ValueBitSet.newInstance(schema);
+ this.position = position;
+ this.displayName = displayName;
+ }
+
public static KeyValueSchema buildSchema(Collection<PColumn> columns) {
KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
for (PColumn column : columns) {