You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/05/03 19:55:08 UTC
[2/2] phoenix git commit: Add PhoenixTableModify and code restructure
to introduce MutationPlan (part of PHOENIX-2197 Support DML in
Phoenix/Calcite integration)
Add PhoenixTableModify and code restructure to introduce MutationPlan (part of PHOENIX-2197 Support DML in Phoenix/Calcite integration)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3425347e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3425347e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3425347e
Branch: refs/heads/calcite
Commit: 3425347e36801252efb927f1abd2eb6a5175a9af
Parents: 95479c1
Author: maryannxue <ma...@gmail.com>
Authored: Tue May 3 13:54:55 2016 -0400
Committer: maryannxue <ma...@gmail.com>
Committed: Tue May 3 13:54:55 2016 -0400
----------------------------------------------------------------------
.../apache/phoenix/calcite/BaseCalciteIT.java | 8 +-
.../apache/phoenix/calcite/CalciteDMLIT.java | 29 ++++
.../apache/phoenix/calcite/CalciteUtils.java | 72 +++++-----
.../calcite/rel/PhoenixAbstractAggregate.java | 8 +-
.../calcite/rel/PhoenixAbstractJoin.java | 6 +-
.../calcite/rel/PhoenixAbstractProject.java | 4 +-
.../calcite/rel/PhoenixAbstractSemiJoin.java | 6 +-
.../calcite/rel/PhoenixAbstractSort.java | 4 +-
.../calcite/rel/PhoenixClientAggregate.java | 4 +-
.../phoenix/calcite/rel/PhoenixClientJoin.java | 3 +-
.../calcite/rel/PhoenixClientProject.java | 4 +-
.../calcite/rel/PhoenixClientSemiJoin.java | 6 +-
.../phoenix/calcite/rel/PhoenixClientSort.java | 4 +-
.../calcite/rel/PhoenixCompactClientSort.java | 4 +-
.../phoenix/calcite/rel/PhoenixConvention.java | 7 +-
.../phoenix/calcite/rel/PhoenixCorrelate.java | 9 +-
.../phoenix/calcite/rel/PhoenixFilter.java | 6 +-
.../phoenix/calcite/rel/PhoenixLimit.java | 6 +-
.../calcite/rel/PhoenixMergeSortUnion.java | 6 +-
.../phoenix/calcite/rel/PhoenixQueryRel.java | 7 +
.../apache/phoenix/calcite/rel/PhoenixRel.java | 51 +-------
.../calcite/rel/PhoenixRelImplementor.java | 50 +++++++
.../calcite/rel/PhoenixRelImplementorImpl.java | 5 +-
.../calcite/rel/PhoenixServerAggregate.java | 4 +-
.../phoenix/calcite/rel/PhoenixServerJoin.java | 3 +-
.../calcite/rel/PhoenixServerProject.java | 5 +-
.../calcite/rel/PhoenixServerSemiJoin.java | 3 +-
.../phoenix/calcite/rel/PhoenixServerSort.java | 4 +-
.../phoenix/calcite/rel/PhoenixTableModify.java | 131 +++++++++++++++++++
.../phoenix/calcite/rel/PhoenixTableScan.java | 6 +-
.../calcite/rel/PhoenixTemporarySort.java | 2 +-
.../rel/PhoenixToEnumerableConverter.java | 17 ++-
.../phoenix/calcite/rel/PhoenixUncollect.java | 6 +-
.../phoenix/calcite/rel/PhoenixUnion.java | 6 +-
.../phoenix/calcite/rel/PhoenixValues.java | 4 +-
.../calcite/rules/PhoenixConverterRules.java | 39 ++++++
.../phoenix/calcite/ToExpressionTest.java | 4 +-
37 files changed, 385 insertions(+), 158 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java
index 607a93f..b171714 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java
@@ -161,7 +161,6 @@ public class BaseCalciteIT extends BaseClientManagedTimeIT {
return this;
}
-
public boolean execute() throws SQLException {
final Statement statement = start.getConnection().createStatement();
final boolean execute = statement.execute(sql);
@@ -178,6 +177,13 @@ public class BaseCalciteIT extends BaseClientManagedTimeIT {
statement.close();
return list;
}
+
+ public Sql executeUpdate() throws SQLException {
+ final Statement statement = start.getConnection().createStatement();
+ statement.executeUpdate(sql);
+ statement.close();
+ return this;
+ }
public void close() {
start.close();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteDMLIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteDMLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteDMLIT.java
new file mode 100644
index 0000000..eb18a45
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteDMLIT.java
@@ -0,0 +1,29 @@
+package org.apache.phoenix.calcite;
+
+import static org.apache.phoenix.util.TestUtil.ATABLE_NAME;
+
+import java.util.Properties;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class CalciteDMLIT extends BaseCalciteIT {
+ private static final Properties PROPS = new Properties();
+
+ @Before
+ public void initTable() throws Exception {
+ final String url = getUrl();
+ ensureTableCreated(url, ATABLE_NAME);
+ }
+
+ @Test
+ public void testUpsertValues() throws Exception {
+ start(PROPS).sql("upsert into atable(organization_id, entity_id) values('1', '1')")
+ .explainIs("PhoenixToEnumerableConverter\n" +
+ " PhoenixTableModify(table=[[phoenix, ATABLE]], operation=[INSERT], updateColumnList=[[]], flattened=[false])\n" +
+ " PhoenixClientProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[null], B_STRING=[null], A_INTEGER=[null], A_DATE=[null], A_TIME=[null], A_TIMESTAMP=[null], X_DECIMAL=[null], X_LONG=[null], X_INTEGER=[null], Y_INTEGER=[null], A_BYTE=[null], A_SHORT=[null], A_FLOAT=[null], A_DOUBLE=[null], A_UNSIGNED_FLOAT=[null], A_UNSIGNED_DOUBLE=[null])\n" +
+ " PhoenixValues(tuples=[[{ '1 ', '1 ' }]])\n")
+ //.executeUpdate()
+ .close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
index b41db0e..8eec17f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
@@ -34,7 +34,7 @@ import org.apache.calcite.util.NlsString;
import org.apache.calcite.util.Util;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.phoenix.calcite.rel.PhoenixRel.Implementor;
+import org.apache.phoenix.calcite.rel.PhoenixRelImplementor;
import org.apache.phoenix.expression.AndExpression;
import org.apache.phoenix.expression.CoerceExpression;
import org.apache.phoenix.expression.ComparisonExpression;
@@ -208,7 +208,7 @@ public class CalciteUtils {
EXPRESSION_MAP.put(SqlKind.AND, new ExpressionFactory() {
@Override
- public Expression newExpression(RexNode node, Implementor implementor) {
+ public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
try {
return AndExpression.create(convertChildren((RexCall) node, implementor));
} catch (SQLException e) {
@@ -220,7 +220,7 @@ public class CalciteUtils {
EXPRESSION_MAP.put(SqlKind.OR, new ExpressionFactory() {
@Override
- public Expression newExpression(RexNode node, Implementor implementor) {
+ public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
return new OrExpression(convertChildren((RexCall) node, implementor));
}
@@ -228,7 +228,7 @@ public class CalciteUtils {
EXPRESSION_MAP.put(SqlKind.EQUALS, new ExpressionFactory() {
@Override
- public Expression newExpression(RexNode node, Implementor implementor) {
+ public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
try {
return ComparisonExpression.create(CompareOp.EQUAL, convertChildren((RexCall) node, implementor), ptr, implementor.getTableMapping().getPTable().rowKeyOrderOptimizable());
@@ -241,7 +241,7 @@ public class CalciteUtils {
EXPRESSION_MAP.put(SqlKind.NOT_EQUALS, new ExpressionFactory() {
@Override
- public Expression newExpression(RexNode node, Implementor implementor) {
+ public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
try {
return ComparisonExpression.create(CompareOp.NOT_EQUAL, convertChildren((RexCall) node, implementor), ptr, implementor.getTableMapping().getPTable().rowKeyOrderOptimizable());
@@ -254,7 +254,7 @@ public class CalciteUtils {
EXPRESSION_MAP.put(SqlKind.GREATER_THAN, new ExpressionFactory() {
@Override
- public Expression newExpression(RexNode node, Implementor implementor) {
+ public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
try {
return ComparisonExpression.create(CompareOp.GREATER, convertChildren((RexCall) node, implementor), ptr, implementor.getTableMapping().getPTable().rowKeyOrderOptimizable());
@@ -267,7 +267,7 @@ public class CalciteUtils {
EXPRESSION_MAP.put(SqlKind.GREATER_THAN_OR_EQUAL, new ExpressionFactory() {
@Override
- public Expression newExpression(RexNode node, Implementor implementor) {
+ public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
try {
return ComparisonExpression.create(CompareOp.GREATER_OR_EQUAL, convertChildren((RexCall) node, implementor), ptr, implementor.getTableMapping().getPTable().rowKeyOrderOptimizable());
@@ -280,7 +280,7 @@ public class CalciteUtils {
EXPRESSION_MAP.put(SqlKind.LESS_THAN, new ExpressionFactory() {
@Override
- public Expression newExpression(RexNode node, Implementor implementor) {
+ public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
try {
return ComparisonExpression.create(CompareOp.LESS, convertChildren((RexCall) node, implementor), ptr, implementor.getTableMapping().getPTable().rowKeyOrderOptimizable());
@@ -293,7 +293,7 @@ public class CalciteUtils {
EXPRESSION_MAP.put(SqlKind.LESS_THAN_OR_EQUAL, new ExpressionFactory() {
@Override
- public Expression newExpression(RexNode node, Implementor implementor) {
+ public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
try {
return ComparisonExpression.create(CompareOp.LESS_OR_EQUAL, convertChildren((RexCall) node, implementor), ptr, implementor.getTableMapping().getPTable().rowKeyOrderOptimizable());
@@ -307,7 +307,7 @@ public class CalciteUtils {
@SuppressWarnings("rawtypes")
@Override
- public Expression newExpression(RexNode node, Implementor implementor) {
+ public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
try {
List<Expression> children = convertChildren((RexCall) node, implementor);
Expression expr = null;
@@ -372,7 +372,7 @@ public class CalciteUtils {
@SuppressWarnings("rawtypes")
@Override
- public Expression newExpression(RexNode node, Implementor implementor) {
+ public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
try {
List<Expression> children = convertChildren((RexCall) node, implementor);
Expression expr = null;
@@ -485,7 +485,7 @@ public class CalciteUtils {
@SuppressWarnings("rawtypes")
@Override
- public Expression newExpression(RexNode node, Implementor implementor) {
+ public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
try {
List<Expression> children = convertChildren((RexCall) node, implementor);
Expression expr = null;
@@ -532,7 +532,7 @@ public class CalciteUtils {
@SuppressWarnings("rawtypes")
@Override
- public Expression newExpression(RexNode node, Implementor implementor) {
+ public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
try {
List<Expression> children = convertChildren((RexCall) node, implementor);
Expression expr = null;
@@ -579,7 +579,7 @@ public class CalciteUtils {
@SuppressWarnings("rawtypes")
@Override
- public Expression newExpression(RexNode node, Implementor implementor) {
+ public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
RexLiteral lit = (RexLiteral) node;
PDataType targetType = sqlTypeNameToPDataType(node.getType().getSqlTypeName());
Object o = lit.getValue();
@@ -599,7 +599,7 @@ public class CalciteUtils {
EXPRESSION_MAP.put(SqlKind.INPUT_REF, new ExpressionFactory() {
@Override
- public Expression newExpression(RexNode node, Implementor implementor) {
+ public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
RexInputRef ref = (RexInputRef) node;
int index = ref.getIndex();
return implementor.newColumnExpression(index);
@@ -609,7 +609,7 @@ public class CalciteUtils {
EXPRESSION_MAP.put(SqlKind.FIELD_ACCESS, new ExpressionFactory() {
@SuppressWarnings("rawtypes")
@Override
- public Expression newExpression(RexNode node, Implementor implementor) {
+ public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
RexFieldAccess fieldAccess = (RexFieldAccess) node;
RexNode refExpr = fieldAccess.getReferenceExpr();
if (refExpr.getKind() != SqlKind.CORREL_VARIABLE) {
@@ -626,7 +626,7 @@ public class CalciteUtils {
@SuppressWarnings("rawtypes")
@Override
public Expression newExpression(RexNode node,
- Implementor implementor) {
+ PhoenixRelImplementor implementor) {
List<Expression> children = convertChildren((RexCall) node, implementor);
PDataType targetType = sqlTypeNameToPDataType(node.getType().getSqlTypeName());
try {
@@ -639,7 +639,7 @@ public class CalciteUtils {
EXPRESSION_MAP.put(SqlKind.OTHER_FUNCTION, new ExpressionFactory() {
@Override
public Expression newExpression(RexNode node,
- Implementor implementor) {
+ PhoenixRelImplementor implementor) {
RexCall call = (RexCall) node;
List<Expression> children = convertChildren(call, implementor);
SqlOperator op = call.getOperator();
@@ -675,32 +675,32 @@ public class CalciteUtils {
});
EXPRESSION_MAP.put(SqlKind.NOT, new ExpressionFactory() {
@Override
- public Expression newExpression(RexNode node, Implementor implementor) {
+ public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
return new NotExpression(convertChildren((RexCall) node, implementor));
}
});
EXPRESSION_MAP.put(SqlKind.IS_TRUE, new ExpressionFactory() {
@Override
- public Expression newExpression(RexNode node, Implementor implementor) {
+ public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
List<Expression> children = convertChildren((RexCall) node, implementor);
return children.get(0);
}
});
EXPRESSION_MAP.put(SqlKind.IS_NOT_TRUE, new ExpressionFactory() {
@Override
- public Expression newExpression(RexNode node, Implementor implementor) {
+ public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
return new NotExpression(convertChildren((RexCall) node, implementor));
}
});
EXPRESSION_MAP.put(SqlKind.IS_FALSE, new ExpressionFactory() {
@Override
- public Expression newExpression(RexNode node, Implementor implementor) {
+ public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
return new NotExpression(convertChildren((RexCall) node, implementor));
}
});
EXPRESSION_MAP.put(SqlKind.IS_NOT_FALSE, new ExpressionFactory() {
@Override
- public Expression newExpression(RexNode node, Implementor implementor) {
+ public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
List<Expression> children = convertChildren((RexCall) node, implementor);
return children.get(0);
}
@@ -708,26 +708,26 @@ public class CalciteUtils {
//TODO different kind of LikeExpression based on configuration
EXPRESSION_MAP.put(SqlKind.LIKE, new ExpressionFactory() {
@Override
- public Expression newExpression(RexNode node, Implementor implementor) {
+ public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
List<Expression> children = convertChildren((RexCall) node, implementor);
return new StringBasedLikeExpression(children);
}
});
EXPRESSION_MAP.put(SqlKind.IS_NULL, new ExpressionFactory() {
@Override
- public Expression newExpression(RexNode node, Implementor implementor) {
+ public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
return new IsNullExpression(convertChildren((RexCall) node, implementor), false);
}
});
EXPRESSION_MAP.put(SqlKind.IS_NOT_NULL, new ExpressionFactory() {
@Override
- public Expression newExpression(RexNode node, Implementor implementor) {
+ public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
return new IsNullExpression(convertChildren((RexCall) node, implementor), true);
}
});
EXPRESSION_MAP.put(SqlKind.TRIM, new ExpressionFactory() {
@Override
- public Expression newExpression(RexNode node, Implementor implementor) {
+ public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
//TODO Phoenix only support separate arguments.
try {
return new TrimFunction(convertChildren((RexCall) node, implementor));
@@ -739,7 +739,7 @@ public class CalciteUtils {
EXPRESSION_MAP.put(SqlKind.CEIL, new ExpressionFactory() {
@SuppressWarnings("rawtypes")
@Override
- public Expression newExpression(RexNode node, Implementor implementor) {
+ public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
//TODO Phoenix only support separate arguments.
List<Expression> children = convertChildren((RexCall) node, implementor);
final Expression firstChild = children.get(0);
@@ -763,7 +763,7 @@ public class CalciteUtils {
EXPRESSION_MAP.put(SqlKind.FLOOR, new ExpressionFactory() {
@SuppressWarnings("rawtypes")
@Override
- public Expression newExpression(RexNode node, Implementor implementor) {
+ public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
// TODO Phoenix only support separate arguments.
List<Expression> children = convertChildren((RexCall) node, implementor);
final Expression firstChild = children.get(0);
@@ -783,7 +783,7 @@ public class CalciteUtils {
});
EXPRESSION_MAP.put(SqlKind.CURRENT_VALUE, new ExpressionFactory() {
@Override
- public Expression newExpression(RexNode node, Implementor implementor) {
+ public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
RexCall call = (RexCall) node;
RexLiteral operand = (RexLiteral) call.getOperands().get(0);
List<String> name = Util.stringToList((String) operand.getValue2());
@@ -794,7 +794,7 @@ public class CalciteUtils {
});
EXPRESSION_MAP.put(SqlKind.NEXT_VALUE, new ExpressionFactory() {
@Override
- public Expression newExpression(RexNode node, Implementor implementor) {
+ public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
RexCall call = (RexCall) node;
RexLiteral operand = (RexLiteral) call.getOperands().get(0);
List<String> name = Util.stringToList((String) operand.getValue2());
@@ -857,7 +857,7 @@ public class CalciteUtils {
});
}
- private static List<Expression> convertChildren(RexCall call, Implementor implementor) {
+ private static List<Expression> convertChildren(RexCall call, PhoenixRelImplementor implementor) {
List<Expression> children = Lists.newArrayListWithExpectedSize(call.getOperands().size());
for (RexNode op : call.getOperands()) {
Expression child = getFactory(op).newExpression(op, implementor);
@@ -867,7 +867,7 @@ public class CalciteUtils {
}
@SuppressWarnings("rawtypes")
- private static Expression cast(PDataType targetDataType, Expression childExpr, Implementor implementor) throws SQLException {
+ private static Expression cast(PDataType targetDataType, Expression childExpr, PhoenixRelImplementor implementor) throws SQLException {
PDataType fromDataType = childExpr.getDataType();
Expression expr = childExpr;
@@ -921,13 +921,13 @@ public class CalciteUtils {
return true;
}
- public static Expression toExpression(RexNode node, Implementor implementor) {
+ public static Expression toExpression(RexNode node, PhoenixRelImplementor implementor) {
ExpressionFactory eFactory = getFactory(node);
Expression expression = eFactory.newExpression(node, implementor);
return expression;
}
- public static AggregateFunction toAggregateFunction(SqlAggFunction aggFunc, List<Integer> args, Implementor implementor) {
+ public static AggregateFunction toAggregateFunction(SqlAggFunction aggFunc, List<Integer> args, PhoenixRelImplementor implementor) {
FunctionFactory fFactory = getFactory(aggFunc);
List<Expression> exprs = Lists.newArrayListWithExpectedSize(args.size());
for (Integer index : args) {
@@ -938,7 +938,7 @@ public class CalciteUtils {
}
public static interface ExpressionFactory {
- public Expression newExpression(RexNode node, Implementor implementor);
+ public Expression newExpression(RexNode node, PhoenixRelImplementor implementor);
}
public static interface FunctionFactory {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
index 6d53db5..023e9f1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
@@ -41,7 +41,7 @@ import com.google.common.collect.Sets;
* Implementation of {@link org.apache.calcite.rel.core.Aggregate}
* relational expression in Phoenix.
*/
-abstract public class PhoenixAbstractAggregate extends Aggregate implements PhoenixRel {
+abstract public class PhoenixAbstractAggregate extends Aggregate implements PhoenixQueryRel {
public static boolean isSingleValueCheckAggregate(Aggregate aggregate) {
List<AggregateCall> aggCalls = aggregate.getAggCallList();
@@ -139,7 +139,7 @@ abstract public class PhoenixAbstractAggregate extends Aggregate implements Phoe
return ImmutableIntList.copyOf(columnRefList);
}
- protected GroupBy getGroupBy(Implementor implementor) {
+ protected GroupBy getGroupBy(PhoenixRelImplementor implementor) {
if (groupSets.size() > 1) {
throw new UnsupportedOperationException();
}
@@ -161,7 +161,7 @@ abstract public class PhoenixAbstractAggregate extends Aggregate implements Phoe
return new GroupBy.GroupByBuilder().setIsOrderPreserving(isOrderedGroupBy).setExpressions(keyExprs).setKeyExpressions(keyExprs).build();
}
- protected void serializeAggregators(Implementor implementor, StatementContext context, boolean isEmptyGroupBy) {
+ protected void serializeAggregators(PhoenixRelImplementor implementor, StatementContext context, boolean isEmptyGroupBy) {
// TODO sort aggFuncs. same problem with group by key sorting.
List<SingleAggregateFunction> aggFuncs = Lists.newArrayList();
for (AggregateCall call : aggCalls) {
@@ -177,7 +177,7 @@ abstract public class PhoenixAbstractAggregate extends Aggregate implements Phoe
context.getAggregationManager().setAggregators(clientAggregators);
}
- protected static QueryPlan wrapWithProject(Implementor implementor, QueryPlan plan, List<Expression> keyExpressions, List<SingleAggregateFunction> aggFuncs) {
+ protected static QueryPlan wrapWithProject(PhoenixRelImplementor implementor, QueryPlan plan, List<Expression> keyExpressions, List<SingleAggregateFunction> aggFuncs) {
List<Expression> exprs = Lists.newArrayList();
for (int i = 0; i < keyExpressions.size(); i++) {
Expression keyExpr = keyExpressions.get(i);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java
index 61e6768..93960bc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java
@@ -22,7 +22,7 @@ import org.apache.phoenix.expression.LiteralExpression;
* Implementation of {@link org.apache.calcite.rel.core.Join}
* relational expression in Phoenix.
*/
-abstract public class PhoenixAbstractJoin extends Join implements PhoenixRel {
+abstract public class PhoenixAbstractJoin extends Join implements PhoenixQueryRel {
public final JoinInfo joinInfo;
public final boolean isSingleValueRhs;
@@ -49,10 +49,10 @@ abstract public class PhoenixAbstractJoin extends Join implements PhoenixRel {
: ImmutableIntList.identity(getRight().getRowType().getFieldCount());
}
- protected QueryPlan implementInput(Implementor implementor, int index, List<Expression> conditionExprs) {
+ protected QueryPlan implementInput(PhoenixRelImplementor implementor, int index, List<Expression> conditionExprs) {
assert index <= 1;
- PhoenixRel input = index == 0 ? (PhoenixRel) left : (PhoenixRel) right;
+ PhoenixQueryRel input = index == 0 ? (PhoenixQueryRel) left : (PhoenixQueryRel) right;
QueryPlan plan = implementor.visitInput(0, input);
if (conditionExprs != null) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractProject.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractProject.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractProject.java
index 6e09fa5..d353dec 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractProject.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractProject.java
@@ -26,7 +26,7 @@ import com.google.common.collect.Lists;
* Implementation of {@link org.apache.calcite.rel.core.Project}
* relational expression in Phoenix.
*/
-abstract public class PhoenixAbstractProject extends Project implements PhoenixRel {
+abstract public class PhoenixAbstractProject extends Project implements PhoenixQueryRel {
protected PhoenixAbstractProject(RelOptCluster cluster, RelTraitSet traits, RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
super(cluster, traits, input, projects, rowType);
}
@@ -49,7 +49,7 @@ abstract public class PhoenixAbstractProject extends Project implements PhoenixR
return ImmutableIntList.copyOf(bitSet.asList());
}
- protected TupleProjector project(Implementor implementor) {
+ protected TupleProjector project(PhoenixRelImplementor implementor) {
List<Expression> exprs = Lists.newArrayList();
for (RexNode project : getProjects()) {
exprs.add(CalciteUtils.toExpression(project, implementor));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSemiJoin.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSemiJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSemiJoin.java
index 5a835bc..ea159c6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSemiJoin.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSemiJoin.java
@@ -13,7 +13,7 @@ import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.LiteralExpression;
-abstract public class PhoenixAbstractSemiJoin extends SemiJoin implements PhoenixRel {
+abstract public class PhoenixAbstractSemiJoin extends SemiJoin implements PhoenixQueryRel {
protected PhoenixAbstractSemiJoin(RelOptCluster cluster, RelTraitSet traitSet,
RelNode left, RelNode right, RexNode condition,
@@ -29,10 +29,10 @@ abstract public class PhoenixAbstractSemiJoin extends SemiJoin implements Phoeni
: rightKeys;
}
- protected QueryPlan implementInput(Implementor implementor, int index, List<Expression> conditionExprs) {
+ protected QueryPlan implementInput(PhoenixRelImplementor implementor, int index, List<Expression> conditionExprs) {
assert index <= 1;
- PhoenixRel input = index == 0 ? (PhoenixRel) left : (PhoenixRel) right;
+ PhoenixQueryRel input = index == 0 ? (PhoenixQueryRel) left : (PhoenixQueryRel) right;
ImmutableIntList keys = index == 0 ? leftKeys : rightKeys;
QueryPlan plan = implementor.visitInput(0, input);
for (Iterator<Integer> iter = keys.iterator(); iter.hasNext();) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java
index b8c0136..724313e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java
@@ -28,7 +28,7 @@ import com.google.common.collect.Lists;
*
* <p>Like {@code Sort}, it also supports LIMIT and OFFSET.
*/
-abstract public class PhoenixAbstractSort extends Sort implements PhoenixRel {
+abstract public class PhoenixAbstractSort extends Sort implements PhoenixQueryRel {
protected PhoenixAbstractSort(RelOptCluster cluster, RelTraitSet traits, RelNode child, RelCollation collation) {
super(cluster, traits, child, collation, null, null);
@@ -44,7 +44,7 @@ abstract public class PhoenixAbstractSort extends Sort implements PhoenixRel {
Util.nLogN(rowCount) * bytesPerRow, rowCount, 0);
}
- protected static OrderBy getOrderBy(RelCollation collation, Implementor implementor, TupleProjector tupleProjector) {
+ protected static OrderBy getOrderBy(RelCollation collation, PhoenixRelImplementor implementor, TupleProjector tupleProjector) {
List<OrderByExpression> orderByExpressions = Lists.newArrayList();
for (RelFieldCollation fieldCollation : collation.getFieldCollations()) {
Expression expr = tupleProjector == null ?
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java
index 164d486..7f336fc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java
@@ -58,9 +58,9 @@ public class PhoenixClientAggregate extends PhoenixAbstractAggregate {
}
@Override
- public QueryPlan implement(Implementor implementor) {
+ public QueryPlan implement(PhoenixRelImplementor implementor) {
implementor.pushContext(implementor.getCurrentContext().withColumnRefList(getColumnRefList()));
- QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+ QueryPlan plan = implementor.visitInput(0, (PhoenixQueryRel) getInput());
implementor.popContext();
TableRef tableRef = implementor.getTableMapping().getTableRef();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
index cb7dc54..ba7900b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.calcite.CalciteUtils;
import org.apache.phoenix.calcite.TableMapping;
import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation;
+import org.apache.phoenix.calcite.rel.PhoenixRelImplementor.ImplementorContext;
import org.apache.phoenix.compile.ColumnResolver;
import org.apache.phoenix.compile.FromCompiler;
import org.apache.phoenix.compile.JoinCompiler;
@@ -110,7 +111,7 @@ public class PhoenixClientJoin extends PhoenixAbstractJoin {
}
@Override
- public QueryPlan implement(Implementor implementor) {
+ public QueryPlan implement(PhoenixRelImplementor implementor) {
List<Expression> leftExprs = Lists.<Expression> newArrayList();
List<Expression> rightExprs = Lists.<Expression> newArrayList();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java
index a49ceb5..1c7a6c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java
@@ -68,9 +68,9 @@ public class PhoenixClientProject extends PhoenixAbstractProject {
}
@Override
- public QueryPlan implement(Implementor implementor) {
+ public QueryPlan implement(PhoenixRelImplementor implementor) {
implementor.pushContext(implementor.getCurrentContext().withColumnRefList(getColumnRefList()));
- QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+ QueryPlan plan = implementor.visitInput(0, (PhoenixQueryRel) getInput());
implementor.popContext();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSemiJoin.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSemiJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSemiJoin.java
index cec1181..a735a49 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSemiJoin.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSemiJoin.java
@@ -18,6 +18,7 @@ import org.apache.calcite.util.ImmutableIntList;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.phoenix.calcite.TableMapping;
import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation;
+import org.apache.phoenix.calcite.rel.PhoenixRelImplementor.ImplementorContext;
import org.apache.phoenix.compile.ColumnResolver;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.SequenceManager;
@@ -31,8 +32,7 @@ import org.apache.phoenix.schema.TableRef;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
-public class PhoenixClientSemiJoin extends PhoenixAbstractSemiJoin implements
- PhoenixRel {
+public class PhoenixClientSemiJoin extends PhoenixAbstractSemiJoin {
public static PhoenixClientSemiJoin create(
final RelNode left, final RelNode right, RexNode condition) {
@@ -94,7 +94,7 @@ public class PhoenixClientSemiJoin extends PhoenixAbstractSemiJoin implements
}
@Override
- public QueryPlan implement(Implementor implementor) {
+ public QueryPlan implement(PhoenixRelImplementor implementor) {
List<Expression> leftExprs = Lists.<Expression> newArrayList();
List<Expression> rightExprs = Lists.<Expression> newArrayList();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
index 16c0a81..c021ccf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
@@ -53,11 +53,11 @@ public class PhoenixClientSort extends PhoenixAbstractSort {
}
@Override
- public QueryPlan implement(Implementor implementor) {
+ public QueryPlan implement(PhoenixRelImplementor implementor) {
if (this.offset != null)
throw new UnsupportedOperationException();
- QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+ QueryPlan plan = implementor.visitInput(0, (PhoenixQueryRel) getInput());
TableRef tableRef = implementor.getTableMapping().getTableRef();
PhoenixStatement stmt = plan.getContext().getStatement();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java
index 8d68a57..35514fb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java
@@ -48,11 +48,11 @@ public class PhoenixCompactClientSort extends PhoenixAbstractSort {
}
@Override
- public QueryPlan implement(Implementor implementor) {
+ public QueryPlan implement(PhoenixRelImplementor implementor) {
if (this.offset != null)
throw new UnsupportedOperationException();
- QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+ QueryPlan plan = implementor.visitInput(0, (PhoenixQueryRel) getInput());
assert plan instanceof TupleProjectionPlan;
// PhoenixServerAggregate wraps the AggregatePlan with a TupleProjectionPlan,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixConvention.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixConvention.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixConvention.java
index 5006f43..cfcdc65 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixConvention.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixConvention.java
@@ -18,7 +18,10 @@ public enum PhoenixConvention implements Convention {
SERVERJOIN,
/** Client convention*/
- CLIENT;
+ CLIENT,
+
+ /** Mutation convention*/
+ MUTATION;
@Override
public RelTraitDef<?> getTraitDef() {
@@ -27,7 +30,7 @@ public enum PhoenixConvention implements Convention {
@Override
public boolean satisfies(RelTrait trait) {
- return this == trait || trait == GENERIC;
+ return this == trait || (this != MUTATION && trait == GENERIC);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCorrelate.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCorrelate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCorrelate.java
index d70c5be..038af58 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCorrelate.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCorrelate.java
@@ -20,6 +20,7 @@ import org.apache.phoenix.calcite.CalciteUtils;
import org.apache.phoenix.calcite.CorrelateVariableImpl;
import org.apache.phoenix.calcite.TableMapping;
import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation;
+import org.apache.phoenix.calcite.rel.PhoenixRelImplementor.ImplementorContext;
import org.apache.phoenix.compile.JoinCompiler;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.execute.CorrelatePlan;
@@ -28,7 +29,7 @@ import org.apache.phoenix.schema.PTable;
import com.google.common.base.Supplier;
-public class PhoenixCorrelate extends Correlate implements PhoenixRel {
+public class PhoenixCorrelate extends Correlate implements PhoenixQueryRel {
public static PhoenixCorrelate create(final RelNode left, final RelNode right,
CorrelationId correlationId, ImmutableBitSet requiredColumns,
@@ -71,16 +72,16 @@ public class PhoenixCorrelate extends Correlate implements PhoenixRel {
}
@Override
- public QueryPlan implement(Implementor implementor) {
+ public QueryPlan implement(PhoenixRelImplementor implementor) {
implementor.pushContext(new ImplementorContext(implementor.getCurrentContext().retainPKColumns, true, ImmutableIntList.identity(getLeft().getRowType().getFieldCount())));
- QueryPlan leftPlan = implementor.visitInput(0, (PhoenixRel) getLeft());
+ QueryPlan leftPlan = implementor.visitInput(0, (PhoenixQueryRel) getLeft());
PTable leftTable = implementor.getTableMapping().getPTable();
implementor.popContext();
implementor.getRuntimeContext().defineCorrelateVariable(getCorrelVariable(), new CorrelateVariableImpl(implementor.getTableMapping()));
implementor.pushContext(new ImplementorContext(false, true, ImmutableIntList.identity(getRight().getRowType().getFieldCount())));
- QueryPlan rightPlan = implementor.visitInput(1, (PhoenixRel) getRight());
+ QueryPlan rightPlan = implementor.visitInput(1, (PhoenixQueryRel) getRight());
PTable rightTable = implementor.getTableMapping().getPTable();
implementor.popContext();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java
index 4f7f4dd..d76a104 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java
@@ -28,7 +28,7 @@ import com.google.common.base.Supplier;
* Implementation of {@link org.apache.calcite.rel.core.Filter}
* relational expression in Phoenix.
*/
-public class PhoenixFilter extends Filter implements PhoenixRel {
+public class PhoenixFilter extends Filter implements PhoenixQueryRel {
public static PhoenixFilter create(final RelNode input, final RexNode condition) {
final RelOptCluster cluster = input.getCluster();
@@ -60,12 +60,12 @@ public class PhoenixFilter extends Filter implements PhoenixRel {
return super.computeSelfCost(planner, mq).multiplyBy(PHOENIX_FACTOR);
}
- public QueryPlan implement(Implementor implementor) {
+ public QueryPlan implement(PhoenixRelImplementor implementor) {
ImmutableIntList columnRefList = implementor.getCurrentContext().columnRefList;
ImmutableBitSet bitSet = InputFinder.analyze(condition).inputBitSet.addAll(columnRefList).build();
columnRefList = ImmutableIntList.copyOf(bitSet.asList());
implementor.pushContext(implementor.getCurrentContext().withColumnRefList(columnRefList));
- QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+ QueryPlan plan = implementor.visitInput(0, (PhoenixQueryRel) getInput());
implementor.popContext();
Expression expr = CalciteUtils.toExpression(condition, implementor);
return new ClientScanPlan(plan.getContext(), plan.getStatement(), plan.getTableRef(),
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java
index 0d77f86..50b6f2b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java
@@ -22,7 +22,7 @@ import org.apache.phoenix.execute.ClientScanPlan;
import com.google.common.base.Supplier;
-public class PhoenixLimit extends SingleRel implements PhoenixRel {
+public class PhoenixLimit extends SingleRel implements PhoenixQueryRel {
public final RexNode offset;
public final RexNode fetch;
@@ -81,8 +81,8 @@ public class PhoenixLimit extends SingleRel implements PhoenixRel {
}
@Override
- public QueryPlan implement(Implementor implementor) {
- QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+ public QueryPlan implement(PhoenixRelImplementor implementor) {
+ QueryPlan plan = implementor.visitInput(0, (PhoenixQueryRel) getInput());
int fetchValue = RexLiteral.intValue(fetch);
if (plan.getLimit() == null) {
return plan.limit(fetchValue);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java
index f48d1ef..f983439 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java
@@ -23,7 +23,7 @@ import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
-public class PhoenixMergeSortUnion extends Union implements PhoenixRel {
+public class PhoenixMergeSortUnion extends Union implements PhoenixQueryRel {
public final RelCollation collation;
public static PhoenixMergeSortUnion create(final List<RelNode> inputs,
@@ -65,10 +65,10 @@ public class PhoenixMergeSortUnion extends Union implements PhoenixRel {
}
@Override
- public QueryPlan implement(Implementor implementor) {
+ public QueryPlan implement(PhoenixRelImplementor implementor) {
List<QueryPlan> subPlans = Lists.newArrayListWithExpectedSize(inputs.size());
for (Ord<RelNode> input : Ord.zip(inputs)) {
- subPlans.add(implementor.visitInput(input.i, (PhoenixRel) input.e));
+ subPlans.add(implementor.visitInput(input.i, (PhoenixQueryRel) input.e));
}
final OrderBy orderBy = PhoenixAbstractSort.getOrderBy(collation, implementor, null);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixQueryRel.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixQueryRel.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixQueryRel.java
new file mode 100644
index 0000000..aaeb156
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixQueryRel.java
@@ -0,0 +1,7 @@
+package org.apache.phoenix.calcite.rel;
+
+import org.apache.phoenix.compile.QueryPlan;
+
+public interface PhoenixQueryRel extends PhoenixRel {
+ QueryPlan implement(PhoenixRelImplementor implementor);
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
index 5d8b7b0..7f52837 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
@@ -1,22 +1,9 @@
package org.apache.phoenix.calcite.rel;
-import java.util.List;
-
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.metadata.RelMetadataProvider;
-import org.apache.calcite.util.ImmutableIntList;
-import org.apache.phoenix.calcite.PhoenixSequence;
-import org.apache.phoenix.calcite.TableMapping;
import org.apache.phoenix.calcite.metadata.PhoenixRelMetadataProvider;
-import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.compile.SequenceManager;
-import org.apache.phoenix.compile.SequenceValueExpression;
-import org.apache.phoenix.execute.RuntimeContext;
-import org.apache.phoenix.execute.TupleProjector;
-import org.apache.phoenix.expression.ColumnExpression;
-import org.apache.phoenix.expression.Expression;
-import org.apache.phoenix.parse.SequenceValueParseNode;
-import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.compile.StatementPlan;
/**
* Relational expression in Phoenix.
@@ -46,39 +33,5 @@ public interface PhoenixRel extends RelNode {
*/
double SERVER_FACTOR = 0.2;
- QueryPlan implement(Implementor implementor);
-
- class ImplementorContext {
- public final boolean retainPKColumns;
- public final boolean forceProject;
- public final ImmutableIntList columnRefList;
-
- public ImplementorContext(boolean retainPKColumns, boolean forceProject, ImmutableIntList columnRefList) {
- this.retainPKColumns = retainPKColumns;
- this.forceProject = forceProject;
- this.columnRefList = columnRefList;
- }
-
- public ImplementorContext withColumnRefList(ImmutableIntList columnRefList) {
- return new ImplementorContext(this.retainPKColumns, this.forceProject, columnRefList);
- }
- }
-
- /** Holds context for an traversal over a tree of relational expressions
- * to convert it to an executable plan. */
- interface Implementor {
- QueryPlan visitInput(int i, PhoenixRel input);
- ColumnExpression newColumnExpression(int index);
- @SuppressWarnings("rawtypes")
- Expression newFieldAccessExpression(String variableId, int index, PDataType type);
- SequenceValueExpression newSequenceExpression(PhoenixSequence seq, SequenceValueParseNode.Op op);
- RuntimeContext getRuntimeContext();
- void setTableMapping(TableMapping tableMapping);
- TableMapping getTableMapping();
- void setSequenceManager(SequenceManager sequenceManager);
- void pushContext(ImplementorContext context);
- ImplementorContext popContext();
- ImplementorContext getCurrentContext();
- TupleProjector project(List<Expression> exprs);
- }
+ StatementPlan implement(PhoenixRelImplementor implementor);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementor.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementor.java
new file mode 100644
index 0000000..b2f275c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementor.java
@@ -0,0 +1,50 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.util.List;
+
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.phoenix.calcite.PhoenixSequence;
+import org.apache.phoenix.calcite.TableMapping;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.SequenceManager;
+import org.apache.phoenix.compile.SequenceValueExpression;
+import org.apache.phoenix.execute.RuntimeContext;
+import org.apache.phoenix.execute.TupleProjector;
+import org.apache.phoenix.expression.ColumnExpression;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.parse.SequenceValueParseNode;
+import org.apache.phoenix.schema.types.PDataType;
+
+/** Holds context for an traversal over a tree of relational expressions
+ * to convert it to an executable plan. */
+public interface PhoenixRelImplementor {
+ QueryPlan visitInput(int i, PhoenixQueryRel input);
+ ColumnExpression newColumnExpression(int index);
+ @SuppressWarnings("rawtypes")
+ Expression newFieldAccessExpression(String variableId, int index, PDataType type);
+ SequenceValueExpression newSequenceExpression(PhoenixSequence seq, SequenceValueParseNode.Op op);
+ RuntimeContext getRuntimeContext();
+ void setTableMapping(TableMapping tableMapping);
+ TableMapping getTableMapping();
+ void setSequenceManager(SequenceManager sequenceManager);
+ void pushContext(ImplementorContext context);
+ ImplementorContext popContext();
+ ImplementorContext getCurrentContext();
+ TupleProjector project(List<Expression> exprs);
+
+ class ImplementorContext {
+ public final boolean retainPKColumns;
+ public final boolean forceProject;
+ public final ImmutableIntList columnRefList;
+
+ public ImplementorContext(boolean retainPKColumns, boolean forceProject, ImmutableIntList columnRefList) {
+ this.retainPKColumns = retainPKColumns;
+ this.forceProject = forceProject;
+ this.columnRefList = columnRefList;
+ }
+
+ public ImplementorContext withColumnRefList(ImmutableIntList columnRefList) {
+ return new ImplementorContext(this.retainPKColumns, this.forceProject, columnRefList);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
index 759f09b..f76ab35 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
@@ -7,7 +7,6 @@ import java.util.Stack;
import org.apache.phoenix.calcite.PhoenixSequence;
import org.apache.phoenix.calcite.TableMapping;
-import org.apache.phoenix.calcite.rel.PhoenixRel.ImplementorContext;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.SequenceManager;
import org.apache.phoenix.compile.SequenceValueExpression;
@@ -31,7 +30,7 @@ import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.types.PDataType;
import com.google.common.collect.Lists;
-public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor {
+public class PhoenixRelImplementorImpl implements PhoenixRelImplementor {
private final RuntimeContext runtimeContext;
private Stack<ImplementorContext> contextStack;
private SequenceManager sequenceManager;
@@ -43,7 +42,7 @@ public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor {
}
@Override
- public QueryPlan visitInput(int i, PhoenixRel input) {
+ public QueryPlan visitInput(int i, PhoenixQueryRel input) {
return input.implement(this);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java
index 68b7f04..65cca2e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java
@@ -54,9 +54,9 @@ public class PhoenixServerAggregate extends PhoenixAbstractAggregate {
}
@Override
- public QueryPlan implement(Implementor implementor) {
+ public QueryPlan implement(PhoenixRelImplementor implementor) {
implementor.pushContext(implementor.getCurrentContext().withColumnRefList(getColumnRefList()));
- QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+ QueryPlan plan = implementor.visitInput(0, (PhoenixQueryRel) getInput());
implementor.popContext();
assert (plan instanceof ScanPlan
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
index 8690377..acb14ae 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
@@ -19,6 +19,7 @@ import org.apache.calcite.rex.RexNode;
import org.apache.phoenix.calcite.CalciteUtils;
import org.apache.phoenix.calcite.TableMapping;
import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation;
+import org.apache.phoenix.calcite.rel.PhoenixRelImplementor.ImplementorContext;
import org.apache.phoenix.compile.JoinCompiler;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.execute.HashJoinPlan;
@@ -105,7 +106,7 @@ public class PhoenixServerJoin extends PhoenixAbstractJoin {
}
@Override
- public QueryPlan implement(Implementor implementor) {
+ public QueryPlan implement(PhoenixRelImplementor implementor) {
List<Expression> leftExprs = Lists.<Expression> newArrayList();
List<Expression> rightExprs = Lists.<Expression> newArrayList();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerProject.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerProject.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerProject.java
index ba0854b..7932c3f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerProject.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerProject.java
@@ -13,6 +13,7 @@ import org.apache.calcite.rel.metadata.RelMdCollation;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
+import org.apache.phoenix.calcite.rel.PhoenixRelImplementor.ImplementorContext;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.execute.ScanPlan;
import org.apache.phoenix.execute.TupleProjector;
@@ -58,9 +59,9 @@ public class PhoenixServerProject extends PhoenixAbstractProject {
}
@Override
- public QueryPlan implement(Implementor implementor) {
+ public QueryPlan implement(PhoenixRelImplementor implementor) {
implementor.pushContext(new ImplementorContext(implementor.getCurrentContext().retainPKColumns, false, getColumnRefList()));
- QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+ QueryPlan plan = implementor.visitInput(0, (PhoenixQueryRel) getInput());
implementor.popContext();
assert (plan instanceof ScanPlan);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSemiJoin.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSemiJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSemiJoin.java
index 2b133aa..6abb2c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSemiJoin.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSemiJoin.java
@@ -17,6 +17,7 @@ import org.apache.calcite.rex.RexNode;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.phoenix.calcite.TableMapping;
import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation;
+import org.apache.phoenix.calcite.rel.PhoenixRelImplementor.ImplementorContext;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.execute.HashJoinPlan;
import org.apache.phoenix.expression.Expression;
@@ -92,7 +93,7 @@ public class PhoenixServerSemiJoin extends PhoenixAbstractSemiJoin {
}
@Override
- public QueryPlan implement(Implementor implementor) {
+ public QueryPlan implement(PhoenixRelImplementor implementor) {
List<Expression> leftExprs = Lists.<Expression> newArrayList();
List<Expression> rightExprs = Lists.<Expression> newArrayList();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java
index 3053cca..9ace803 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java
@@ -49,11 +49,11 @@ public class PhoenixServerSort extends PhoenixAbstractSort {
}
@Override
- public QueryPlan implement(Implementor implementor) {
+ public QueryPlan implement(PhoenixRelImplementor implementor) {
if (this.offset != null)
throw new UnsupportedOperationException();
- QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+ QueryPlan plan = implementor.visitInput(0, (PhoenixQueryRel) getInput());
assert (plan instanceof ScanPlan
|| plan instanceof HashJoinPlan)
&& plan.getLimit() == null;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
new file mode 100644
index 0000000..d5e4836
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
@@ -0,0 +1,131 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.sql.ParameterMetaData;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare.CatalogReader;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.phoenix.calcite.PhoenixTable;
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.MutationPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.compile.StatementPlan;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PLong;
+
+import com.google.common.collect.Lists;
+
+public class PhoenixTableModify extends TableModify implements PhoenixRel {
+
+ public PhoenixTableModify(RelOptCluster cluster, RelTraitSet traits,
+ RelOptTable table, CatalogReader catalogReader, RelNode child,
+ Operation operation, List<String> updateColumnList, boolean flattened) {
+ super(cluster, traits, table, catalogReader, child, operation, updateColumnList, flattened);
+ }
+
+ @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new PhoenixTableModify(
+ getCluster(),
+ traitSet,
+ getTable(),
+ getCatalogReader(),
+ sole(inputs),
+ getOperation(),
+ getUpdateColumnList(),
+ isFlattened());
+ }
+
+ @Override
+ public StatementPlan implement(PhoenixRelImplementor implementor) {
+ if (getOperation() != Operation.INSERT) {
+ throw new UnsupportedOperationException();
+ }
+
+ final QueryPlan queryPlan = implementor.visitInput(0, (PhoenixQueryRel) input);
+ final TableRef targetTableRef = getTable().unwrap(PhoenixTable.class).tableMapping.getTableRef();
+ return new MutationPlan() {
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return queryPlan.getContext().getBindManager().getParameterMetaData();
+ }
+
+ @Override
+ public StatementContext getContext() {
+ return queryPlan.getContext();
+ }
+
+ @Override
+ public TableRef getTargetRef() {
+ return targetTableRef;
+ }
+
+ @Override
+ public Set<TableRef> getSourceRefs() {
+ // TODO return originalQueryPlan.getSourceRefs();
+ return queryPlan.getSourceRefs();
+ }
+
+ @Override
+ public org.apache.phoenix.jdbc.PhoenixStatement.Operation getOperation() {
+ return org.apache.phoenix.jdbc.PhoenixStatement.Operation.UPSERT;
+ }
+
+ @Override
+ public MutationState execute() throws SQLException {
+// ResultIterator iterator = queryPlan.iterator();
+// if (parallelIteratorFactory == null) {
+// return upsertSelect(new StatementContext(statement), tableRef, projector, iterator, columnIndexes, pkSlotIndexes, useServerTimestamp);
+// }
+// try {
+// parallelIteratorFactory.setRowProjector(projector);
+// parallelIteratorFactory.setColumnIndexes(columnIndexes);
+// parallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes);
+// Tuple tuple;
+// long totalRowCount = 0;
+// StatementContext context = queryPlan.getContext();
+// while ((tuple=iterator.next()) != null) {// Runs query
+// Cell kv = tuple.getValue(0);
+// totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault());
+// }
+// // Return total number of rows that have been updated. In the case of auto commit being off
+// // the mutations will all be in the mutation state of the current connection.
+// MutationState mutationState = new MutationState(maxSize, statement.getConnection(), totalRowCount);
+// /*
+// * All the metrics collected for measuring the reads done by the parallel mutating iterators
+// * is included in the ReadMetricHolder of the statement context. Include these metrics in the
+// * returned mutation state so they can be published on commit.
+// */
+// mutationState.setReadMetricQueue(context.getReadMetricsQueue());
+// return mutationState;
+// } finally {
+// iterator.close();
+// }
+ return null;
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ List<String> queryPlanSteps = queryPlan.getExplainPlan().getPlanSteps();
+ List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+ planSteps.add("UPSERT SELECT");
+ planSteps.addAll(queryPlanSteps);
+ return new ExplainPlan(planSteps);
+ }
+
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
index e9324e2..b4d64b1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
@@ -54,7 +54,7 @@ import com.google.common.collect.ImmutableList.Builder;
/**
* Scan of a Phoenix table.
*/
-public class PhoenixTableScan extends TableScan implements PhoenixRel {
+public class PhoenixTableScan extends TableScan implements PhoenixQueryRel {
public enum ScanOrder {
NONE,
FORWARD,
@@ -123,7 +123,7 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel {
// time when the correlate variable has not been defined yet.
// 2) get a guess of ScanRange even if the runtime value is absent.
// 3) test whether this dynamic filter is worth a recompile at runtime.
- Implementor tmpImplementor = new PhoenixRelImplementorImpl(null) {
+ PhoenixRelImplementor tmpImplementor = new PhoenixRelImplementorImpl(null) {
@SuppressWarnings("rawtypes")
@Override
public Expression newFieldAccessExpression(String variableId, int index, PDataType type) {
@@ -244,7 +244,7 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel {
}
@Override
- public QueryPlan implement(Implementor implementor) {
+ public QueryPlan implement(PhoenixRelImplementor implementor) {
final PhoenixTable phoenixTable = table.unwrap(PhoenixTable.class);
TableMapping tableMapping = phoenixTable.tableMapping;
implementor.setTableMapping(tableMapping);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTemporarySort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTemporarySort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTemporarySort.java
index 33aa797..596212f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTemporarySort.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTemporarySort.java
@@ -38,7 +38,7 @@ public class PhoenixTemporarySort extends PhoenixAbstractSort {
}
@Override
- public QueryPlan implement(Implementor implementor) {
+ public QueryPlan implement(PhoenixRelImplementor implementor) {
throw new UnsupportedOperationException();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
index 677baa5..5e750b1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
@@ -25,10 +25,11 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.phoenix.calcite.BuiltInMethod;
-import org.apache.phoenix.calcite.rel.PhoenixRel.ImplementorContext;
+import org.apache.phoenix.calcite.rel.PhoenixRelImplementor.ImplementorContext;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementPlan;
import org.apache.phoenix.execute.DelegateQueryPlan;
import org.apache.phoenix.execute.RuntimeContextImpl;
import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
@@ -72,7 +73,7 @@ public class PhoenixToEnumerableConverter extends ConverterImpl implements Enume
// ResultIterator iterator = root.get("x");
// return CalciteRuntime.toEnumerable(iterator);
final BlockBuilder list = new BlockBuilder();
- QueryPlan plan = makePlan((PhoenixRel)getInput());
+ StatementPlan plan = makePlan((PhoenixRel)getInput());
Expression var = stash(implementor, plan, QueryPlan.class);
final RelDataType rowType = getRowType();
final PhysType physType =
@@ -89,11 +90,15 @@ public class PhoenixToEnumerableConverter extends ConverterImpl implements Enume
return implementor.result(physType, list.toBlock());
}
- static QueryPlan makePlan(PhoenixRel rel) {
- final PhoenixRel.Implementor phoenixImplementor = new PhoenixRelImplementorImpl(new RuntimeContextImpl());
+ static StatementPlan makePlan(PhoenixRel rel) {
+ final PhoenixRelImplementor phoenixImplementor = new PhoenixRelImplementorImpl(new RuntimeContextImpl());
phoenixImplementor.pushContext(new ImplementorContext(true, false, ImmutableIntList.identity(rel.getRowType().getFieldCount())));
- final QueryPlan plan = phoenixImplementor.visitInput(0, rel);
- return new DelegateQueryPlan(plan) {
+ final StatementPlan plan = rel.implement(phoenixImplementor);
+ if (!(plan instanceof QueryPlan)) {
+ return plan;
+ }
+
+ return new DelegateQueryPlan((QueryPlan) plan) {
@Override
public ResultIterator iterator() throws SQLException {
return iterator(DefaultParallelScanGrouper.getInstance());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java
index f591fde..1e21054 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java
@@ -21,7 +21,7 @@ import org.apache.phoenix.schema.types.PInteger;
import com.google.common.collect.Lists;
-public class PhoenixUncollect extends Uncollect implements PhoenixRel {
+public class PhoenixUncollect extends Uncollect implements PhoenixQueryRel {
public static PhoenixUncollect create(RelNode input, boolean withOrdinality) {
RelOptCluster cluster = input.getCluster();
@@ -49,8 +49,8 @@ public class PhoenixUncollect extends Uncollect implements PhoenixRel {
}
@Override
- public QueryPlan implement(Implementor implementor) {
- QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+ public QueryPlan implement(PhoenixRelImplementor implementor) {
+ QueryPlan plan = implementor.visitInput(0, (PhoenixQueryRel) getInput());
Expression arrayExpression = implementor.newColumnExpression(0);
@SuppressWarnings("rawtypes")
PDataType baseType = PDataType.fromTypeId(arrayExpression.getDataType().getSqlType() - PDataType.ARRAY_TYPE_BASE);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java
index 57393c8..675abc1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java
@@ -22,7 +22,7 @@ import com.google.common.collect.Lists;
* Implementation of {@link org.apache.calcite.rel.core.Union}
* relational expression in Phoenix.
*/
-public class PhoenixUnion extends Union implements PhoenixRel {
+public class PhoenixUnion extends Union implements PhoenixQueryRel {
public static PhoenixUnion create(List<RelNode> inputs, boolean all) {
RelOptCluster cluster = inputs.get(0).getCluster();
@@ -52,10 +52,10 @@ public class PhoenixUnion extends Union implements PhoenixRel {
}
@Override
- public QueryPlan implement(Implementor implementor) {
+ public QueryPlan implement(PhoenixRelImplementor implementor) {
List<QueryPlan> subPlans = Lists.newArrayListWithExpectedSize(inputs.size());
for (Ord<RelNode> input : Ord.zip(inputs)) {
- subPlans.add(implementor.visitInput(input.i, (PhoenixRel) input.e));
+ subPlans.add(implementor.visitInput(input.i, (PhoenixQueryRel) input.e));
}
return new UnionPlan(subPlans.get(0).getContext(), SelectStatement.SELECT_ONE, subPlans.get(0).getTableRef(), RowProjector.EMPTY_PROJECTOR,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java
index c0fd272..886e861 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java
@@ -55,7 +55,7 @@ import com.google.common.collect.Lists;
* Implementation of {@link org.apache.calcite.rel.core.Values}
* relational expression in Phoenix.
*/
-public class PhoenixValues extends Values implements PhoenixRel {
+public class PhoenixValues extends Values implements PhoenixQueryRel {
private static final PhoenixConnection phoenixConnection;
static {
@@ -107,7 +107,7 @@ public class PhoenixValues extends Values implements PhoenixRel {
}
@Override
- public QueryPlan implement(Implementor implementor) {
+ public QueryPlan implement(PhoenixRelImplementor implementor) {
List<Tuple> literalResult = Lists.newArrayList();
Iterator<ImmutableList<RexLiteral>> iter = getTuples().iterator();
Tuple baseTuple = new SingleKeyValueTuple(KeyValue.LOWESTKEY);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
index 396974d..c42df99 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
@@ -32,10 +32,12 @@ import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalTableModify;
import org.apache.calcite.rel.logical.LogicalUnion;
import org.apache.calcite.rel.logical.LogicalValues;
import org.apache.calcite.rex.RexNode;
import org.apache.phoenix.calcite.CalciteUtils;
+import org.apache.phoenix.calcite.PhoenixTable;
import org.apache.phoenix.calcite.rel.PhoenixAbstractAggregate;
import org.apache.phoenix.calcite.rel.PhoenixClientAggregate;
import org.apache.phoenix.calcite.rel.PhoenixClientJoin;
@@ -51,6 +53,7 @@ import org.apache.phoenix.calcite.rel.PhoenixServerJoin;
import org.apache.phoenix.calcite.rel.PhoenixServerProject;
import org.apache.phoenix.calcite.rel.PhoenixServerSemiJoin;
import org.apache.phoenix.calcite.rel.PhoenixServerSort;
+import org.apache.phoenix.calcite.rel.PhoenixTableModify;
import org.apache.phoenix.calcite.rel.PhoenixToEnumerableConverter;
import org.apache.phoenix.calcite.rel.PhoenixUncollect;
import org.apache.phoenix.calcite.rel.PhoenixUnion;
@@ -90,6 +93,7 @@ public class PhoenixConverterRules {
PhoenixValuesRule.INSTANCE,
PhoenixUncollectRule.INSTANCE,
PhoenixCorrelateRule.INSTANCE,
+ PhoenixTableModifyRule.INSTANCE,
};
public static final RelOptRule[] CONVERTIBLE_RULES = {
@@ -114,6 +118,7 @@ public class PhoenixConverterRules {
PhoenixValuesRule.INSTANCE,
PhoenixUncollectRule.INSTANCE,
PhoenixCorrelateRule.INSTANCE,
+ PhoenixTableModifyRule.INSTANCE,
};
/** Base class for planner rules that convert a relational expression to
@@ -816,6 +821,40 @@ public class PhoenixConverterRules {
/**
+ * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalTableModify} to a
+ * {@link PhoenixTableModify}.
+ */
+ public static class PhoenixTableModifyRule extends PhoenixConverterRule {
+
+ private static final PhoenixTableModifyRule INSTANCE = new PhoenixTableModifyRule();
+
+ private PhoenixTableModifyRule() {
+ super(LogicalTableModify.class, Convention.NONE,
+ PhoenixConvention.CLIENT, "PhoenixTableModifyRule");
+ }
+
+ public RelNode convert(RelNode rel) {
+ final LogicalTableModify modify = (LogicalTableModify) rel;
+ final PhoenixTable phoenixTable = modify.getTable().unwrap(PhoenixTable.class);
+ if (phoenixTable == null) {
+ return null;
+ }
+
+ return new PhoenixTableModify(
+ modify.getCluster(),
+ modify.getTraitSet().replace(PhoenixConvention.CLIENT),
+ modify.getTable(),
+ modify.getCatalogReader(),
+ convert(
+ modify.getInput(),
+ modify.getTraitSet().replace(PhoenixConvention.GENERIC)),
+ modify.getOperation(),
+ modify.getUpdateColumnList(),
+ modify.isFlattened());
+ }
+ }
+
+ /**
* Rule to convert a relational expression from
* {@link org.apache.phoenix.calcite.rel.PhoenixConvention} to
* {@link org.apache.calcite.adapter.enumerable.EnumerableConvention}.