You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/05/03 19:55:08 UTC

[2/2] phoenix git commit: Add PhoenixTableModify and code restructure to introduce MutationPlan (part of PHOENIX-2197 Support DML in Phoenix/Calcite integration)

Add PhoenixTableModify and code restructure to introduce MutationPlan (part of PHOENIX-2197 Support DML in Phoenix/Calcite integration)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3425347e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3425347e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3425347e

Branch: refs/heads/calcite
Commit: 3425347e36801252efb927f1abd2eb6a5175a9af
Parents: 95479c1
Author: maryannxue <ma...@gmail.com>
Authored: Tue May 3 13:54:55 2016 -0400
Committer: maryannxue <ma...@gmail.com>
Committed: Tue May 3 13:54:55 2016 -0400

----------------------------------------------------------------------
 .../apache/phoenix/calcite/BaseCalciteIT.java   |   8 +-
 .../apache/phoenix/calcite/CalciteDMLIT.java    |  29 ++++
 .../apache/phoenix/calcite/CalciteUtils.java    |  72 +++++-----
 .../calcite/rel/PhoenixAbstractAggregate.java   |   8 +-
 .../calcite/rel/PhoenixAbstractJoin.java        |   6 +-
 .../calcite/rel/PhoenixAbstractProject.java     |   4 +-
 .../calcite/rel/PhoenixAbstractSemiJoin.java    |   6 +-
 .../calcite/rel/PhoenixAbstractSort.java        |   4 +-
 .../calcite/rel/PhoenixClientAggregate.java     |   4 +-
 .../phoenix/calcite/rel/PhoenixClientJoin.java  |   3 +-
 .../calcite/rel/PhoenixClientProject.java       |   4 +-
 .../calcite/rel/PhoenixClientSemiJoin.java      |   6 +-
 .../phoenix/calcite/rel/PhoenixClientSort.java  |   4 +-
 .../calcite/rel/PhoenixCompactClientSort.java   |   4 +-
 .../phoenix/calcite/rel/PhoenixConvention.java  |   7 +-
 .../phoenix/calcite/rel/PhoenixCorrelate.java   |   9 +-
 .../phoenix/calcite/rel/PhoenixFilter.java      |   6 +-
 .../phoenix/calcite/rel/PhoenixLimit.java       |   6 +-
 .../calcite/rel/PhoenixMergeSortUnion.java      |   6 +-
 .../phoenix/calcite/rel/PhoenixQueryRel.java    |   7 +
 .../apache/phoenix/calcite/rel/PhoenixRel.java  |  51 +-------
 .../calcite/rel/PhoenixRelImplementor.java      |  50 +++++++
 .../calcite/rel/PhoenixRelImplementorImpl.java  |   5 +-
 .../calcite/rel/PhoenixServerAggregate.java     |   4 +-
 .../phoenix/calcite/rel/PhoenixServerJoin.java  |   3 +-
 .../calcite/rel/PhoenixServerProject.java       |   5 +-
 .../calcite/rel/PhoenixServerSemiJoin.java      |   3 +-
 .../phoenix/calcite/rel/PhoenixServerSort.java  |   4 +-
 .../phoenix/calcite/rel/PhoenixTableModify.java | 131 +++++++++++++++++++
 .../phoenix/calcite/rel/PhoenixTableScan.java   |   6 +-
 .../calcite/rel/PhoenixTemporarySort.java       |   2 +-
 .../rel/PhoenixToEnumerableConverter.java       |  17 ++-
 .../phoenix/calcite/rel/PhoenixUncollect.java   |   6 +-
 .../phoenix/calcite/rel/PhoenixUnion.java       |   6 +-
 .../phoenix/calcite/rel/PhoenixValues.java      |   4 +-
 .../calcite/rules/PhoenixConverterRules.java    |  39 ++++++
 .../phoenix/calcite/ToExpressionTest.java       |   4 +-
 37 files changed, 385 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java
index 607a93f..b171714 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/BaseCalciteIT.java
@@ -161,7 +161,6 @@ public class BaseCalciteIT extends BaseClientManagedTimeIT {
             return this;
         }
 
-
         public boolean execute() throws SQLException {
             final Statement statement = start.getConnection().createStatement();
             final boolean execute = statement.execute(sql);
@@ -178,6 +177,13 @@ public class BaseCalciteIT extends BaseClientManagedTimeIT {
             statement.close();
             return list;
         }
+        
+        public Sql executeUpdate() throws SQLException {
+            final Statement statement = start.getConnection().createStatement();
+            statement.executeUpdate(sql);
+            statement.close();
+            return this;
+        }
 
         public void close() {
             start.close();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteDMLIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteDMLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteDMLIT.java
new file mode 100644
index 0000000..eb18a45
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/calcite/CalciteDMLIT.java
@@ -0,0 +1,29 @@
+package org.apache.phoenix.calcite;
+
+import static org.apache.phoenix.util.TestUtil.ATABLE_NAME;
+
+import java.util.Properties;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class CalciteDMLIT extends BaseCalciteIT {
+    private static final Properties PROPS = new Properties();
+    
+    @Before
+    public void initTable() throws Exception {
+        final String url = getUrl();
+        ensureTableCreated(url, ATABLE_NAME);
+    }
+
+    @Test
+    public void testUpsertValues() throws Exception {
+        start(PROPS).sql("upsert into atable(organization_id, entity_id) values('1', '1')")
+            .explainIs("PhoenixToEnumerableConverter\n" +
+                       "  PhoenixTableModify(table=[[phoenix, ATABLE]], operation=[INSERT], updateColumnList=[[]], flattened=[false])\n" +
+                       "    PhoenixClientProject(ORGANIZATION_ID=[$0], ENTITY_ID=[$1], A_STRING=[null], B_STRING=[null], A_INTEGER=[null], A_DATE=[null], A_TIME=[null], A_TIMESTAMP=[null], X_DECIMAL=[null], X_LONG=[null], X_INTEGER=[null], Y_INTEGER=[null], A_BYTE=[null], A_SHORT=[null], A_FLOAT=[null], A_DOUBLE=[null], A_UNSIGNED_FLOAT=[null], A_UNSIGNED_DOUBLE=[null])\n" +
+                       "      PhoenixValues(tuples=[[{ '1              ', '1              ' }]])\n")
+            //.executeUpdate()
+            .close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
index b41db0e..8eec17f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/CalciteUtils.java
@@ -34,7 +34,7 @@ import org.apache.calcite.util.NlsString;
 import org.apache.calcite.util.Util;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.phoenix.calcite.rel.PhoenixRel.Implementor;
+import org.apache.phoenix.calcite.rel.PhoenixRelImplementor;
 import org.apache.phoenix.expression.AndExpression;
 import org.apache.phoenix.expression.CoerceExpression;
 import org.apache.phoenix.expression.ComparisonExpression;
@@ -208,7 +208,7 @@ public class CalciteUtils {
         EXPRESSION_MAP.put(SqlKind.AND, new ExpressionFactory() {
 
             @Override
-            public Expression newExpression(RexNode node, Implementor implementor) {
+            public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
                 try {
                     return AndExpression.create(convertChildren((RexCall) node, implementor));
                 } catch (SQLException e) {
@@ -220,7 +220,7 @@ public class CalciteUtils {
         EXPRESSION_MAP.put(SqlKind.OR, new ExpressionFactory() {
 
             @Override
-            public Expression newExpression(RexNode node, Implementor implementor) {
+            public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
                 return new OrExpression(convertChildren((RexCall) node, implementor));
             }
             
@@ -228,7 +228,7 @@ public class CalciteUtils {
 		EXPRESSION_MAP.put(SqlKind.EQUALS, new ExpressionFactory() {
 
 			@Override
-			public Expression newExpression(RexNode node, Implementor implementor) {
+			public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
 				ImmutableBytesWritable ptr = new ImmutableBytesWritable();
 				try {
 					return ComparisonExpression.create(CompareOp.EQUAL, convertChildren((RexCall) node, implementor), ptr, implementor.getTableMapping().getPTable().rowKeyOrderOptimizable());
@@ -241,7 +241,7 @@ public class CalciteUtils {
         EXPRESSION_MAP.put(SqlKind.NOT_EQUALS, new ExpressionFactory() {
 
             @Override
-            public Expression newExpression(RexNode node, Implementor implementor) {
+            public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
                 ImmutableBytesWritable ptr = new ImmutableBytesWritable();
                 try {
                     return ComparisonExpression.create(CompareOp.NOT_EQUAL, convertChildren((RexCall) node, implementor), ptr, implementor.getTableMapping().getPTable().rowKeyOrderOptimizable());
@@ -254,7 +254,7 @@ public class CalciteUtils {
         EXPRESSION_MAP.put(SqlKind.GREATER_THAN, new ExpressionFactory() {
 
             @Override
-            public Expression newExpression(RexNode node, Implementor implementor) {
+            public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
                 ImmutableBytesWritable ptr = new ImmutableBytesWritable();
                 try {
                     return ComparisonExpression.create(CompareOp.GREATER, convertChildren((RexCall) node, implementor), ptr, implementor.getTableMapping().getPTable().rowKeyOrderOptimizable());
@@ -267,7 +267,7 @@ public class CalciteUtils {
         EXPRESSION_MAP.put(SqlKind.GREATER_THAN_OR_EQUAL, new ExpressionFactory() {
 
             @Override
-            public Expression newExpression(RexNode node, Implementor implementor) {
+            public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
                 ImmutableBytesWritable ptr = new ImmutableBytesWritable();
                 try {
                     return ComparisonExpression.create(CompareOp.GREATER_OR_EQUAL, convertChildren((RexCall) node, implementor), ptr, implementor.getTableMapping().getPTable().rowKeyOrderOptimizable());
@@ -280,7 +280,7 @@ public class CalciteUtils {
         EXPRESSION_MAP.put(SqlKind.LESS_THAN, new ExpressionFactory() {
 
             @Override
-            public Expression newExpression(RexNode node, Implementor implementor) {
+            public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
                 ImmutableBytesWritable ptr = new ImmutableBytesWritable();
                 try {
                     return ComparisonExpression.create(CompareOp.LESS, convertChildren((RexCall) node, implementor), ptr, implementor.getTableMapping().getPTable().rowKeyOrderOptimizable());
@@ -293,7 +293,7 @@ public class CalciteUtils {
         EXPRESSION_MAP.put(SqlKind.LESS_THAN_OR_EQUAL, new ExpressionFactory() {
 
             @Override
-            public Expression newExpression(RexNode node, Implementor implementor) {
+            public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
                 ImmutableBytesWritable ptr = new ImmutableBytesWritable();
                 try {
                     return ComparisonExpression.create(CompareOp.LESS_OR_EQUAL, convertChildren((RexCall) node, implementor), ptr, implementor.getTableMapping().getPTable().rowKeyOrderOptimizable());
@@ -307,7 +307,7 @@ public class CalciteUtils {
 
             @SuppressWarnings("rawtypes")
             @Override
-            public Expression newExpression(RexNode node, Implementor implementor) {
+            public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
                 try {
                     List<Expression> children = convertChildren((RexCall) node, implementor);
                     Expression expr = null;
@@ -372,7 +372,7 @@ public class CalciteUtils {
 
             @SuppressWarnings("rawtypes")
             @Override
-            public Expression newExpression(RexNode node, Implementor implementor) {
+            public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
                 try {
                     List<Expression> children = convertChildren((RexCall) node, implementor);
                     Expression expr = null;
@@ -485,7 +485,7 @@ public class CalciteUtils {
 
             @SuppressWarnings("rawtypes")
             @Override
-            public Expression newExpression(RexNode node, Implementor implementor) {
+            public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
                 try {
                     List<Expression> children = convertChildren((RexCall) node, implementor);
                     Expression expr = null;
@@ -532,7 +532,7 @@ public class CalciteUtils {
 
             @SuppressWarnings("rawtypes")
             @Override
-            public Expression newExpression(RexNode node, Implementor implementor) {
+            public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
                 try {
                     List<Expression> children = convertChildren((RexCall) node, implementor);
                     Expression expr = null;
@@ -579,7 +579,7 @@ public class CalciteUtils {
 
 			@SuppressWarnings("rawtypes")
             @Override
-			public Expression newExpression(RexNode node, Implementor implementor) {
+			public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
 				RexLiteral lit = (RexLiteral) node;
                 PDataType targetType = sqlTypeNameToPDataType(node.getType().getSqlTypeName());
 				Object o = lit.getValue();
@@ -599,7 +599,7 @@ public class CalciteUtils {
 		EXPRESSION_MAP.put(SqlKind.INPUT_REF, new ExpressionFactory() {
 
 			@Override
-			public Expression newExpression(RexNode node, Implementor implementor) {
+			public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
 				RexInputRef ref = (RexInputRef) node;
 				int index = ref.getIndex();
 				return implementor.newColumnExpression(index);
@@ -609,7 +609,7 @@ public class CalciteUtils {
 		EXPRESSION_MAP.put(SqlKind.FIELD_ACCESS, new ExpressionFactory() {
             @SuppressWarnings("rawtypes")
             @Override
-            public Expression newExpression(RexNode node, Implementor implementor) {
+            public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
                 RexFieldAccess fieldAccess = (RexFieldAccess) node;
                 RexNode refExpr = fieldAccess.getReferenceExpr();
                 if (refExpr.getKind() != SqlKind.CORREL_VARIABLE) {
@@ -626,7 +626,7 @@ public class CalciteUtils {
             @SuppressWarnings("rawtypes")
             @Override
             public Expression newExpression(RexNode node,
-                    Implementor implementor) {                
+                    PhoenixRelImplementor implementor) {                
                 List<Expression> children = convertChildren((RexCall) node, implementor);
                 PDataType targetType = sqlTypeNameToPDataType(node.getType().getSqlTypeName());
                 try {
@@ -639,7 +639,7 @@ public class CalciteUtils {
         EXPRESSION_MAP.put(SqlKind.OTHER_FUNCTION, new ExpressionFactory() {
             @Override
             public Expression newExpression(RexNode node,
-                    Implementor implementor) {
+                    PhoenixRelImplementor implementor) {
                 RexCall call = (RexCall) node;
                 List<Expression> children = convertChildren(call, implementor);
                 SqlOperator op = call.getOperator();
@@ -675,32 +675,32 @@ public class CalciteUtils {
 		});
         EXPRESSION_MAP.put(SqlKind.NOT, new ExpressionFactory() {
             @Override
-            public Expression newExpression(RexNode node, Implementor implementor) {
+            public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
                 return new NotExpression(convertChildren((RexCall) node, implementor));
             }
         });
         EXPRESSION_MAP.put(SqlKind.IS_TRUE, new ExpressionFactory() {
             @Override
-            public Expression newExpression(RexNode node, Implementor implementor) {
+            public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
                 List<Expression> children = convertChildren((RexCall) node, implementor);
                 return children.get(0);
             }
         });
         EXPRESSION_MAP.put(SqlKind.IS_NOT_TRUE, new ExpressionFactory() {
             @Override
-            public Expression newExpression(RexNode node, Implementor implementor) {
+            public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
                 return new NotExpression(convertChildren((RexCall) node, implementor));
             }
         });
         EXPRESSION_MAP.put(SqlKind.IS_FALSE, new ExpressionFactory() {
             @Override
-            public Expression newExpression(RexNode node, Implementor implementor) {
+            public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
                 return new NotExpression(convertChildren((RexCall) node, implementor));
             }
         });
         EXPRESSION_MAP.put(SqlKind.IS_NOT_FALSE, new ExpressionFactory() {
             @Override
-            public Expression newExpression(RexNode node, Implementor implementor) {
+            public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
                 List<Expression> children = convertChildren((RexCall) node, implementor);
                 return children.get(0);
             }
@@ -708,26 +708,26 @@ public class CalciteUtils {
         //TODO different kind of LikeExpression based on configuration
         EXPRESSION_MAP.put(SqlKind.LIKE, new ExpressionFactory() {
             @Override
-            public Expression newExpression(RexNode node, Implementor implementor) {
+            public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
                 List<Expression> children = convertChildren((RexCall) node, implementor);
                 return new StringBasedLikeExpression(children);
             }
         });
         EXPRESSION_MAP.put(SqlKind.IS_NULL, new ExpressionFactory() {
             @Override
-            public Expression newExpression(RexNode node, Implementor implementor) {
+            public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
                 return new IsNullExpression(convertChildren((RexCall) node, implementor), false);
             }
         });
         EXPRESSION_MAP.put(SqlKind.IS_NOT_NULL, new ExpressionFactory() {
             @Override
-            public Expression newExpression(RexNode node, Implementor implementor) {
+            public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
                 return new IsNullExpression(convertChildren((RexCall) node, implementor), true);
             }
         });
         EXPRESSION_MAP.put(SqlKind.TRIM, new ExpressionFactory() {
             @Override
-            public Expression newExpression(RexNode node, Implementor implementor) {
+            public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
                 //TODO Phoenix only support separate arguments.
                 try {
                     return new TrimFunction(convertChildren((RexCall) node, implementor));
@@ -739,7 +739,7 @@ public class CalciteUtils {
         EXPRESSION_MAP.put(SqlKind.CEIL, new ExpressionFactory() {
             @SuppressWarnings("rawtypes")
             @Override
-            public Expression newExpression(RexNode node, Implementor implementor) {
+            public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
                 //TODO Phoenix only support separate arguments.
                 List<Expression> children = convertChildren((RexCall) node, implementor);
                 final Expression firstChild = children.get(0);
@@ -763,7 +763,7 @@ public class CalciteUtils {
         EXPRESSION_MAP.put(SqlKind.FLOOR, new ExpressionFactory() {
             @SuppressWarnings("rawtypes")
             @Override
-            public Expression newExpression(RexNode node, Implementor implementor) {
+            public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
                 // TODO Phoenix only support separate arguments.
                 List<Expression> children = convertChildren((RexCall) node, implementor);
                 final Expression firstChild = children.get(0);
@@ -783,7 +783,7 @@ public class CalciteUtils {
         });
         EXPRESSION_MAP.put(SqlKind.CURRENT_VALUE, new ExpressionFactory() {
             @Override
-            public Expression newExpression(RexNode node, Implementor implementor) {
+            public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
                 RexCall call = (RexCall) node;
                 RexLiteral operand = (RexLiteral) call.getOperands().get(0);
                 List<String> name = Util.stringToList((String) operand.getValue2());
@@ -794,7 +794,7 @@ public class CalciteUtils {
         });
         EXPRESSION_MAP.put(SqlKind.NEXT_VALUE, new ExpressionFactory() {
             @Override
-            public Expression newExpression(RexNode node, Implementor implementor) {
+            public Expression newExpression(RexNode node, PhoenixRelImplementor implementor) {
                 RexCall call = (RexCall) node;
                 RexLiteral operand = (RexLiteral) call.getOperands().get(0);
                 List<String> name = Util.stringToList((String) operand.getValue2());
@@ -857,7 +857,7 @@ public class CalciteUtils {
         });
     }
     
-    private static List<Expression> convertChildren(RexCall call, Implementor implementor) {
+    private static List<Expression> convertChildren(RexCall call, PhoenixRelImplementor implementor) {
         List<Expression> children = Lists.newArrayListWithExpectedSize(call.getOperands().size());
         for (RexNode op : call.getOperands()) {
             Expression child = getFactory(op).newExpression(op, implementor);
@@ -867,7 +867,7 @@ public class CalciteUtils {
     }
     
     @SuppressWarnings("rawtypes")
-    private static Expression cast(PDataType targetDataType, Expression childExpr, Implementor implementor) throws SQLException {
+    private static Expression cast(PDataType targetDataType, Expression childExpr, PhoenixRelImplementor implementor) throws SQLException {
         PDataType fromDataType = childExpr.getDataType();
         
         Expression expr = childExpr;
@@ -921,13 +921,13 @@ public class CalciteUtils {
         return true;
     }
 
-	public static Expression toExpression(RexNode node, Implementor implementor) {
+	public static Expression toExpression(RexNode node, PhoenixRelImplementor implementor) {
 		ExpressionFactory eFactory = getFactory(node);
 		Expression expression = eFactory.newExpression(node, implementor);
 		return expression;
 	}
 	
-	public static AggregateFunction toAggregateFunction(SqlAggFunction aggFunc, List<Integer> args, Implementor implementor) {
+	public static AggregateFunction toAggregateFunction(SqlAggFunction aggFunc, List<Integer> args, PhoenixRelImplementor implementor) {
 	    FunctionFactory fFactory = getFactory(aggFunc);
 	    List<Expression> exprs = Lists.newArrayListWithExpectedSize(args.size());
 	    for (Integer index : args) {
@@ -938,7 +938,7 @@ public class CalciteUtils {
 	}
 	
 	public static interface ExpressionFactory {
-		public Expression newExpression(RexNode node, Implementor implementor);
+		public Expression newExpression(RexNode node, PhoenixRelImplementor implementor);
 	}
 	
 	public static interface FunctionFactory {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
index 6d53db5..023e9f1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractAggregate.java
@@ -41,7 +41,7 @@ import com.google.common.collect.Sets;
  * Implementation of {@link org.apache.calcite.rel.core.Aggregate}
  * relational expression in Phoenix.
  */
-abstract public class PhoenixAbstractAggregate extends Aggregate implements PhoenixRel {
+abstract public class PhoenixAbstractAggregate extends Aggregate implements PhoenixQueryRel {
     
     public static boolean isSingleValueCheckAggregate(Aggregate aggregate) {
         List<AggregateCall> aggCalls = aggregate.getAggCallList();
@@ -139,7 +139,7 @@ abstract public class PhoenixAbstractAggregate extends Aggregate implements Phoe
         return ImmutableIntList.copyOf(columnRefList);
     }
     
-    protected GroupBy getGroupBy(Implementor implementor) {
+    protected GroupBy getGroupBy(PhoenixRelImplementor implementor) {
         if (groupSets.size() > 1) {
             throw new UnsupportedOperationException();
         }
@@ -161,7 +161,7 @@ abstract public class PhoenixAbstractAggregate extends Aggregate implements Phoe
         return new GroupBy.GroupByBuilder().setIsOrderPreserving(isOrderedGroupBy).setExpressions(keyExprs).setKeyExpressions(keyExprs).build();        
     }
     
-    protected void serializeAggregators(Implementor implementor, StatementContext context, boolean isEmptyGroupBy) {
+    protected void serializeAggregators(PhoenixRelImplementor implementor, StatementContext context, boolean isEmptyGroupBy) {
         // TODO sort aggFuncs. same problem with group by key sorting.
         List<SingleAggregateFunction> aggFuncs = Lists.newArrayList();
         for (AggregateCall call : aggCalls) {
@@ -177,7 +177,7 @@ abstract public class PhoenixAbstractAggregate extends Aggregate implements Phoe
         context.getAggregationManager().setAggregators(clientAggregators);
     }
     
-    protected static QueryPlan wrapWithProject(Implementor implementor, QueryPlan plan, List<Expression> keyExpressions, List<SingleAggregateFunction> aggFuncs) {
+    protected static QueryPlan wrapWithProject(PhoenixRelImplementor implementor, QueryPlan plan, List<Expression> keyExpressions, List<SingleAggregateFunction> aggFuncs) {
         List<Expression> exprs = Lists.newArrayList();
         for (int i = 0; i < keyExpressions.size(); i++) {
             Expression keyExpr = keyExpressions.get(i);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java
index 61e6768..93960bc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractJoin.java
@@ -22,7 +22,7 @@ import org.apache.phoenix.expression.LiteralExpression;
  * Implementation of {@link org.apache.calcite.rel.core.Join}
  * relational expression in Phoenix.
  */
-abstract public class PhoenixAbstractJoin extends Join implements PhoenixRel {
+abstract public class PhoenixAbstractJoin extends Join implements PhoenixQueryRel {
     public final JoinInfo joinInfo;
     public final boolean isSingleValueRhs;
 
@@ -49,10 +49,10 @@ abstract public class PhoenixAbstractJoin extends Join implements PhoenixRel {
               : ImmutableIntList.identity(getRight().getRowType().getFieldCount());
     }
     
-    protected QueryPlan implementInput(Implementor implementor, int index, List<Expression> conditionExprs) {
+    protected QueryPlan implementInput(PhoenixRelImplementor implementor, int index, List<Expression> conditionExprs) {
         assert index <= 1;
         
-        PhoenixRel input = index == 0 ? (PhoenixRel) left : (PhoenixRel) right;
+        PhoenixQueryRel input = index == 0 ? (PhoenixQueryRel) left : (PhoenixQueryRel) right;
         QueryPlan plan = implementor.visitInput(0, input);
         
         if (conditionExprs != null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractProject.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractProject.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractProject.java
index 6e09fa5..d353dec 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractProject.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractProject.java
@@ -26,7 +26,7 @@ import com.google.common.collect.Lists;
  * Implementation of {@link org.apache.calcite.rel.core.Project}
  * relational expression in Phoenix.
  */
-abstract public class PhoenixAbstractProject extends Project implements PhoenixRel {
+abstract public class PhoenixAbstractProject extends Project implements PhoenixQueryRel {
     protected PhoenixAbstractProject(RelOptCluster cluster, RelTraitSet traits, RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
         super(cluster, traits, input, projects, rowType);
     }
@@ -49,7 +49,7 @@ abstract public class PhoenixAbstractProject extends Project implements PhoenixR
         return ImmutableIntList.copyOf(bitSet.asList());
     }
     
-    protected TupleProjector project(Implementor implementor) {        
+    protected TupleProjector project(PhoenixRelImplementor implementor) {        
         List<Expression> exprs = Lists.newArrayList();
         for (RexNode project : getProjects()) {
             exprs.add(CalciteUtils.toExpression(project, implementor));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSemiJoin.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSemiJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSemiJoin.java
index 5a835bc..ea159c6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSemiJoin.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSemiJoin.java
@@ -13,7 +13,7 @@ import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
 
-abstract public class PhoenixAbstractSemiJoin extends SemiJoin implements PhoenixRel {
+abstract public class PhoenixAbstractSemiJoin extends SemiJoin implements PhoenixQueryRel {
 
     protected PhoenixAbstractSemiJoin(RelOptCluster cluster, RelTraitSet traitSet,
             RelNode left, RelNode right, RexNode condition,
@@ -29,10 +29,10 @@ abstract public class PhoenixAbstractSemiJoin extends SemiJoin implements Phoeni
               : rightKeys;
     }
     
-    protected QueryPlan implementInput(Implementor implementor, int index, List<Expression> conditionExprs) {
+    protected QueryPlan implementInput(PhoenixRelImplementor implementor, int index, List<Expression> conditionExprs) {
         assert index <= 1;
         
-        PhoenixRel input = index == 0 ? (PhoenixRel) left : (PhoenixRel) right;
+        PhoenixQueryRel input = index == 0 ? (PhoenixQueryRel) left : (PhoenixQueryRel) right;
         ImmutableIntList keys = index == 0 ? leftKeys : rightKeys;
         QueryPlan plan = implementor.visitInput(0, input);
         for (Iterator<Integer> iter = keys.iterator(); iter.hasNext();) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java
index b8c0136..724313e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixAbstractSort.java
@@ -28,7 +28,7 @@ import com.google.common.collect.Lists;
  *
  * <p>Like {@code Sort}, it also supports LIMIT and OFFSET.
  */
-abstract public class PhoenixAbstractSort extends Sort implements PhoenixRel {
+abstract public class PhoenixAbstractSort extends Sort implements PhoenixQueryRel {
     
     protected PhoenixAbstractSort(RelOptCluster cluster, RelTraitSet traits, RelNode child, RelCollation collation) {
         super(cluster, traits, child, collation, null, null);
@@ -44,7 +44,7 @@ abstract public class PhoenixAbstractSort extends Sort implements PhoenixRel {
                 Util.nLogN(rowCount) * bytesPerRow, rowCount, 0);
     }
 
-    protected static OrderBy getOrderBy(RelCollation collation, Implementor implementor, TupleProjector tupleProjector) {
+    protected static OrderBy getOrderBy(RelCollation collation, PhoenixRelImplementor implementor, TupleProjector tupleProjector) {
         List<OrderByExpression> orderByExpressions = Lists.newArrayList();
         for (RelFieldCollation fieldCollation : collation.getFieldCollations()) {
             Expression expr = tupleProjector == null ? 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java
index 164d486..7f336fc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientAggregate.java
@@ -58,9 +58,9 @@ public class PhoenixClientAggregate extends PhoenixAbstractAggregate {
     }
 
     @Override
-    public QueryPlan implement(Implementor implementor) {
+    public QueryPlan implement(PhoenixRelImplementor implementor) {
         implementor.pushContext(implementor.getCurrentContext().withColumnRefList(getColumnRefList()));
-        QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+        QueryPlan plan = implementor.visitInput(0, (PhoenixQueryRel) getInput());
         implementor.popContext();
         
         TableRef tableRef = implementor.getTableMapping().getTableRef();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
index cb7dc54..ba7900b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientJoin.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.calcite.CalciteUtils;
 import org.apache.phoenix.calcite.TableMapping;
 import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation;
+import org.apache.phoenix.calcite.rel.PhoenixRelImplementor.ImplementorContext;
 import org.apache.phoenix.compile.ColumnResolver;
 import org.apache.phoenix.compile.FromCompiler;
 import org.apache.phoenix.compile.JoinCompiler;
@@ -110,7 +111,7 @@ public class PhoenixClientJoin extends PhoenixAbstractJoin {
     }
 
     @Override
-    public QueryPlan implement(Implementor implementor) {
+    public QueryPlan implement(PhoenixRelImplementor implementor) {
         List<Expression> leftExprs = Lists.<Expression> newArrayList();
         List<Expression> rightExprs = Lists.<Expression> newArrayList();
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java
index a49ceb5..1c7a6c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientProject.java
@@ -68,9 +68,9 @@ public class PhoenixClientProject extends PhoenixAbstractProject {
     }
 
     @Override
-    public QueryPlan implement(Implementor implementor) {
+    public QueryPlan implement(PhoenixRelImplementor implementor) {
         implementor.pushContext(implementor.getCurrentContext().withColumnRefList(getColumnRefList()));
-        QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+        QueryPlan plan = implementor.visitInput(0, (PhoenixQueryRel) getInput());
         implementor.popContext();
         
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSemiJoin.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSemiJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSemiJoin.java
index cec1181..a735a49 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSemiJoin.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSemiJoin.java
@@ -18,6 +18,7 @@ import org.apache.calcite.util.ImmutableIntList;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.calcite.TableMapping;
 import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation;
+import org.apache.phoenix.calcite.rel.PhoenixRelImplementor.ImplementorContext;
 import org.apache.phoenix.compile.ColumnResolver;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.SequenceManager;
@@ -31,8 +32,7 @@ import org.apache.phoenix.schema.TableRef;
 import com.google.common.base.Supplier;
 import com.google.common.collect.Lists;
 
-public class PhoenixClientSemiJoin extends PhoenixAbstractSemiJoin implements
-        PhoenixRel {
+public class PhoenixClientSemiJoin extends PhoenixAbstractSemiJoin {
     
     public static PhoenixClientSemiJoin create(
             final RelNode left, final RelNode right, RexNode condition) {
@@ -94,7 +94,7 @@ public class PhoenixClientSemiJoin extends PhoenixAbstractSemiJoin implements
     }
 
     @Override
-    public QueryPlan implement(Implementor implementor) {
+    public QueryPlan implement(PhoenixRelImplementor implementor) {
         List<Expression> leftExprs = Lists.<Expression> newArrayList();
         List<Expression> rightExprs = Lists.<Expression> newArrayList();
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
index 16c0a81..c021ccf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixClientSort.java
@@ -53,11 +53,11 @@ public class PhoenixClientSort extends PhoenixAbstractSort {
     }
 
     @Override
-    public QueryPlan implement(Implementor implementor) {
+    public QueryPlan implement(PhoenixRelImplementor implementor) {
         if (this.offset != null)
             throw new UnsupportedOperationException();
             
-        QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+        QueryPlan plan = implementor.visitInput(0, (PhoenixQueryRel) getInput());
         
         TableRef tableRef = implementor.getTableMapping().getTableRef();
         PhoenixStatement stmt = plan.getContext().getStatement();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java
index 8d68a57..35514fb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCompactClientSort.java
@@ -48,11 +48,11 @@ public class PhoenixCompactClientSort extends PhoenixAbstractSort {
     }
 
     @Override
-    public QueryPlan implement(Implementor implementor) {
+    public QueryPlan implement(PhoenixRelImplementor implementor) {
         if (this.offset != null)
             throw new UnsupportedOperationException();
             
-        QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+        QueryPlan plan = implementor.visitInput(0, (PhoenixQueryRel) getInput());
         assert plan instanceof TupleProjectionPlan;
         
         // PhoenixServerAggregate wraps the AggregatePlan with a TupleProjectionPlan,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixConvention.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixConvention.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixConvention.java
index 5006f43..cfcdc65 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixConvention.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixConvention.java
@@ -18,7 +18,10 @@ public enum PhoenixConvention implements Convention {
     SERVERJOIN,
     
     /** Client convention*/
-    CLIENT;
+    CLIENT,
+    
+    /** Mutation convention*/
+    MUTATION;
 
     @Override
     public RelTraitDef<?> getTraitDef() {
@@ -27,7 +30,7 @@ public enum PhoenixConvention implements Convention {
 
     @Override
     public boolean satisfies(RelTrait trait) {
-        return this == trait || trait == GENERIC;
+        return this == trait || (this != MUTATION && trait == GENERIC);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCorrelate.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCorrelate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCorrelate.java
index d70c5be..038af58 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCorrelate.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixCorrelate.java
@@ -20,6 +20,7 @@ import org.apache.phoenix.calcite.CalciteUtils;
 import org.apache.phoenix.calcite.CorrelateVariableImpl;
 import org.apache.phoenix.calcite.TableMapping;
 import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation;
+import org.apache.phoenix.calcite.rel.PhoenixRelImplementor.ImplementorContext;
 import org.apache.phoenix.compile.JoinCompiler;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.execute.CorrelatePlan;
@@ -28,7 +29,7 @@ import org.apache.phoenix.schema.PTable;
 
 import com.google.common.base.Supplier;
 
-public class PhoenixCorrelate extends Correlate implements PhoenixRel {
+public class PhoenixCorrelate extends Correlate implements PhoenixQueryRel {
     
     public static PhoenixCorrelate create(final RelNode left, final RelNode right, 
             CorrelationId correlationId, ImmutableBitSet requiredColumns, 
@@ -71,16 +72,16 @@ public class PhoenixCorrelate extends Correlate implements PhoenixRel {
     }
     
     @Override
-    public QueryPlan implement(Implementor implementor) {
+    public QueryPlan implement(PhoenixRelImplementor implementor) {
         implementor.pushContext(new ImplementorContext(implementor.getCurrentContext().retainPKColumns, true, ImmutableIntList.identity(getLeft().getRowType().getFieldCount())));
-        QueryPlan leftPlan = implementor.visitInput(0, (PhoenixRel) getLeft());
+        QueryPlan leftPlan = implementor.visitInput(0, (PhoenixQueryRel) getLeft());
         PTable leftTable = implementor.getTableMapping().getPTable();
         implementor.popContext();
 
         implementor.getRuntimeContext().defineCorrelateVariable(getCorrelVariable(), new CorrelateVariableImpl(implementor.getTableMapping()));
 
         implementor.pushContext(new ImplementorContext(false, true, ImmutableIntList.identity(getRight().getRowType().getFieldCount())));
-        QueryPlan rightPlan = implementor.visitInput(1, (PhoenixRel) getRight());
+        QueryPlan rightPlan = implementor.visitInput(1, (PhoenixQueryRel) getRight());
         PTable rightTable = implementor.getTableMapping().getPTable();
         implementor.popContext();
                 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java
index 4f7f4dd..d76a104 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixFilter.java
@@ -28,7 +28,7 @@ import com.google.common.base.Supplier;
  * Implementation of {@link org.apache.calcite.rel.core.Filter}
  * relational expression in Phoenix.
  */
-public class PhoenixFilter extends Filter implements PhoenixRel {
+public class PhoenixFilter extends Filter implements PhoenixQueryRel {
     
     public static PhoenixFilter create(final RelNode input, final RexNode condition) {
         final RelOptCluster cluster = input.getCluster();
@@ -60,12 +60,12 @@ public class PhoenixFilter extends Filter implements PhoenixRel {
         return super.computeSelfCost(planner, mq).multiplyBy(PHOENIX_FACTOR);
     }
 
-    public QueryPlan implement(Implementor implementor) {
+    public QueryPlan implement(PhoenixRelImplementor implementor) {
         ImmutableIntList columnRefList = implementor.getCurrentContext().columnRefList;
         ImmutableBitSet bitSet = InputFinder.analyze(condition).inputBitSet.addAll(columnRefList).build();
         columnRefList = ImmutableIntList.copyOf(bitSet.asList());
         implementor.pushContext(implementor.getCurrentContext().withColumnRefList(columnRefList));
-        QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+        QueryPlan plan = implementor.visitInput(0, (PhoenixQueryRel) getInput());
         implementor.popContext();
         Expression expr = CalciteUtils.toExpression(condition, implementor);
         return new ClientScanPlan(plan.getContext(), plan.getStatement(), plan.getTableRef(),

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java
index 0d77f86..50b6f2b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixLimit.java
@@ -22,7 +22,7 @@ import org.apache.phoenix.execute.ClientScanPlan;
 
 import com.google.common.base.Supplier;
 
-public class PhoenixLimit extends SingleRel implements PhoenixRel {
+public class PhoenixLimit extends SingleRel implements PhoenixQueryRel {
     public final RexNode offset;
     public final RexNode fetch;
     
@@ -81,8 +81,8 @@ public class PhoenixLimit extends SingleRel implements PhoenixRel {
     }
 
     @Override
-    public QueryPlan implement(Implementor implementor) {
-        QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+    public QueryPlan implement(PhoenixRelImplementor implementor) {
+        QueryPlan plan = implementor.visitInput(0, (PhoenixQueryRel) getInput());
         int fetchValue = RexLiteral.intValue(fetch);
         if (plan.getLimit() == null) {
             return plan.limit(fetchValue);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java
index f48d1ef..f983439 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixMergeSortUnion.java
@@ -23,7 +23,7 @@ import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
-public class PhoenixMergeSortUnion extends Union implements PhoenixRel {
+public class PhoenixMergeSortUnion extends Union implements PhoenixQueryRel {
 	public final RelCollation collation;
     
     public static PhoenixMergeSortUnion create(final List<RelNode> inputs,
@@ -65,10 +65,10 @@ public class PhoenixMergeSortUnion extends Union implements PhoenixRel {
     }
 
     @Override
-    public QueryPlan implement(Implementor implementor) {
+    public QueryPlan implement(PhoenixRelImplementor implementor) {
         List<QueryPlan> subPlans = Lists.newArrayListWithExpectedSize(inputs.size());
         for (Ord<RelNode> input : Ord.zip(inputs)) {
-            subPlans.add(implementor.visitInput(input.i, (PhoenixRel) input.e));
+            subPlans.add(implementor.visitInput(input.i, (PhoenixQueryRel) input.e));
         }
         
         final OrderBy orderBy = PhoenixAbstractSort.getOrderBy(collation, implementor, null);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixQueryRel.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixQueryRel.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixQueryRel.java
new file mode 100644
index 0000000..aaeb156
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixQueryRel.java
@@ -0,0 +1,7 @@
+package org.apache.phoenix.calcite.rel;
+
+import org.apache.phoenix.compile.QueryPlan;
+
+public interface PhoenixQueryRel extends PhoenixRel {
+    QueryPlan implement(PhoenixRelImplementor implementor);
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
index 5d8b7b0..7f52837 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRel.java
@@ -1,22 +1,9 @@
 package org.apache.phoenix.calcite.rel;
 
-import java.util.List;
-
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.metadata.RelMetadataProvider;
-import org.apache.calcite.util.ImmutableIntList;
-import org.apache.phoenix.calcite.PhoenixSequence;
-import org.apache.phoenix.calcite.TableMapping;
 import org.apache.phoenix.calcite.metadata.PhoenixRelMetadataProvider;
-import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.compile.SequenceManager;
-import org.apache.phoenix.compile.SequenceValueExpression;
-import org.apache.phoenix.execute.RuntimeContext;
-import org.apache.phoenix.execute.TupleProjector;
-import org.apache.phoenix.expression.ColumnExpression;
-import org.apache.phoenix.expression.Expression;
-import org.apache.phoenix.parse.SequenceValueParseNode;
-import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.compile.StatementPlan;
 
 /**
  * Relational expression in Phoenix.
@@ -46,39 +33,5 @@ public interface PhoenixRel extends RelNode {
    */
   double SERVER_FACTOR = 0.2;
 
-  QueryPlan implement(Implementor implementor);
-  
-  class ImplementorContext {
-      public final boolean retainPKColumns;
-      public final boolean forceProject;
-      public final ImmutableIntList columnRefList;
-      
-      public ImplementorContext(boolean retainPKColumns, boolean forceProject, ImmutableIntList columnRefList) {
-          this.retainPKColumns = retainPKColumns;
-          this.forceProject = forceProject;
-          this.columnRefList = columnRefList;
-      }
-      
-      public ImplementorContext withColumnRefList(ImmutableIntList columnRefList) {
-          return new ImplementorContext(this.retainPKColumns, this.forceProject, columnRefList);
-      }
-  }
-
-  /** Holds context for an traversal over a tree of relational expressions
-   * to convert it to an executable plan. */
-  interface Implementor {
-    QueryPlan visitInput(int i, PhoenixRel input);
-    ColumnExpression newColumnExpression(int index);
-    @SuppressWarnings("rawtypes")
-    Expression newFieldAccessExpression(String variableId, int index, PDataType type);
-    SequenceValueExpression newSequenceExpression(PhoenixSequence seq, SequenceValueParseNode.Op op);
-    RuntimeContext getRuntimeContext();
-    void setTableMapping(TableMapping tableMapping);
-    TableMapping getTableMapping();
-    void setSequenceManager(SequenceManager sequenceManager);
-    void pushContext(ImplementorContext context);
-    ImplementorContext popContext();
-    ImplementorContext getCurrentContext();
-    TupleProjector project(List<Expression> exprs);
-  }
+  StatementPlan implement(PhoenixRelImplementor implementor);
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementor.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementor.java
new file mode 100644
index 0000000..b2f275c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementor.java
@@ -0,0 +1,50 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.util.List;
+
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.phoenix.calcite.PhoenixSequence;
+import org.apache.phoenix.calcite.TableMapping;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.SequenceManager;
+import org.apache.phoenix.compile.SequenceValueExpression;
+import org.apache.phoenix.execute.RuntimeContext;
+import org.apache.phoenix.execute.TupleProjector;
+import org.apache.phoenix.expression.ColumnExpression;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.parse.SequenceValueParseNode;
+import org.apache.phoenix.schema.types.PDataType;
+
+/** Holds context for an traversal over a tree of relational expressions
+ * to convert it to an executable plan. */
+public interface PhoenixRelImplementor {
+    QueryPlan visitInput(int i, PhoenixQueryRel input);
+    ColumnExpression newColumnExpression(int index);
+    @SuppressWarnings("rawtypes")
+    Expression newFieldAccessExpression(String variableId, int index, PDataType type);
+    SequenceValueExpression newSequenceExpression(PhoenixSequence seq, SequenceValueParseNode.Op op);
+    RuntimeContext getRuntimeContext();
+    void setTableMapping(TableMapping tableMapping);
+    TableMapping getTableMapping();
+    void setSequenceManager(SequenceManager sequenceManager);
+    void pushContext(ImplementorContext context);
+    ImplementorContext popContext();
+    ImplementorContext getCurrentContext();
+    TupleProjector project(List<Expression> exprs);
+    
+    class ImplementorContext {
+        public final boolean retainPKColumns;
+        public final boolean forceProject;
+        public final ImmutableIntList columnRefList;
+        
+        public ImplementorContext(boolean retainPKColumns, boolean forceProject, ImmutableIntList columnRefList) {
+            this.retainPKColumns = retainPKColumns;
+            this.forceProject = forceProject;
+            this.columnRefList = columnRefList;
+        }
+        
+        public ImplementorContext withColumnRefList(ImmutableIntList columnRefList) {
+            return new ImplementorContext(this.retainPKColumns, this.forceProject, columnRefList);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
index 759f09b..f76ab35 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixRelImplementorImpl.java
@@ -7,7 +7,6 @@ import java.util.Stack;
 
 import org.apache.phoenix.calcite.PhoenixSequence;
 import org.apache.phoenix.calcite.TableMapping;
-import org.apache.phoenix.calcite.rel.PhoenixRel.ImplementorContext;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.SequenceManager;
 import org.apache.phoenix.compile.SequenceValueExpression;
@@ -31,7 +30,7 @@ import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.types.PDataType;
 import com.google.common.collect.Lists;
 
-public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor {
+public class PhoenixRelImplementorImpl implements PhoenixRelImplementor {
     private final RuntimeContext runtimeContext;
 	private Stack<ImplementorContext> contextStack;
 	private SequenceManager sequenceManager;
@@ -43,7 +42,7 @@ public class PhoenixRelImplementorImpl implements PhoenixRel.Implementor {
 	}
 	
     @Override
-    public QueryPlan visitInput(int i, PhoenixRel input) {
+    public QueryPlan visitInput(int i, PhoenixQueryRel input) {
         return input.implement(this);
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java
index 68b7f04..65cca2e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerAggregate.java
@@ -54,9 +54,9 @@ public class PhoenixServerAggregate extends PhoenixAbstractAggregate {
     }
 
     @Override
-    public QueryPlan implement(Implementor implementor) {
+    public QueryPlan implement(PhoenixRelImplementor implementor) {
         implementor.pushContext(implementor.getCurrentContext().withColumnRefList(getColumnRefList()));
-        QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+        QueryPlan plan = implementor.visitInput(0, (PhoenixQueryRel) getInput());
         implementor.popContext();
         
         assert (plan instanceof ScanPlan 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
index 8690377..acb14ae 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerJoin.java
@@ -19,6 +19,7 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.phoenix.calcite.CalciteUtils;
 import org.apache.phoenix.calcite.TableMapping;
 import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation;
+import org.apache.phoenix.calcite.rel.PhoenixRelImplementor.ImplementorContext;
 import org.apache.phoenix.compile.JoinCompiler;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.execute.HashJoinPlan;
@@ -105,7 +106,7 @@ public class PhoenixServerJoin extends PhoenixAbstractJoin {
     }
     
     @Override
-    public QueryPlan implement(Implementor implementor) {
+    public QueryPlan implement(PhoenixRelImplementor implementor) {
         List<Expression> leftExprs = Lists.<Expression> newArrayList();
         List<Expression> rightExprs = Lists.<Expression> newArrayList();
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerProject.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerProject.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerProject.java
index ba0854b..7932c3f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerProject.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerProject.java
@@ -13,6 +13,7 @@ import org.apache.calcite.rel.metadata.RelMdCollation;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexNode;
+import org.apache.phoenix.calcite.rel.PhoenixRelImplementor.ImplementorContext;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.execute.ScanPlan;
 import org.apache.phoenix.execute.TupleProjector;
@@ -58,9 +59,9 @@ public class PhoenixServerProject extends PhoenixAbstractProject {
     }
 
     @Override
-    public QueryPlan implement(Implementor implementor) {
+    public QueryPlan implement(PhoenixRelImplementor implementor) {
         implementor.pushContext(new ImplementorContext(implementor.getCurrentContext().retainPKColumns, false, getColumnRefList()));
-        QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+        QueryPlan plan = implementor.visitInput(0, (PhoenixQueryRel) getInput());
         implementor.popContext();
         
         assert (plan instanceof ScanPlan);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSemiJoin.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSemiJoin.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSemiJoin.java
index 2b133aa..6abb2c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSemiJoin.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSemiJoin.java
@@ -17,6 +17,7 @@ import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.phoenix.calcite.TableMapping;
 import org.apache.phoenix.calcite.metadata.PhoenixRelMdCollation;
+import org.apache.phoenix.calcite.rel.PhoenixRelImplementor.ImplementorContext;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.execute.HashJoinPlan;
 import org.apache.phoenix.expression.Expression;
@@ -92,7 +93,7 @@ public class PhoenixServerSemiJoin extends PhoenixAbstractSemiJoin {
     }
 
     @Override
-    public QueryPlan implement(Implementor implementor) {
+    public QueryPlan implement(PhoenixRelImplementor implementor) {
         List<Expression> leftExprs = Lists.<Expression> newArrayList();
         List<Expression> rightExprs = Lists.<Expression> newArrayList();
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java
index 3053cca..9ace803 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixServerSort.java
@@ -49,11 +49,11 @@ public class PhoenixServerSort extends PhoenixAbstractSort {
     }
 
     @Override
-    public QueryPlan implement(Implementor implementor) {
+    public QueryPlan implement(PhoenixRelImplementor implementor) {
         if (this.offset != null)
             throw new UnsupportedOperationException();
             
-        QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+        QueryPlan plan = implementor.visitInput(0, (PhoenixQueryRel) getInput());
         assert (plan instanceof ScanPlan 
                     || plan instanceof HashJoinPlan) 
                 && plan.getLimit() == null;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
new file mode 100644
index 0000000..d5e4836
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
@@ -0,0 +1,131 @@
+package org.apache.phoenix.calcite.rel;
+
+import java.sql.ParameterMetaData;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare.CatalogReader;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.phoenix.calcite.PhoenixTable;
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.MutationPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.compile.StatementPlan;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PLong;
+
+import com.google.common.collect.Lists;
+
+public class PhoenixTableModify extends TableModify implements PhoenixRel {
+
+    public PhoenixTableModify(RelOptCluster cluster, RelTraitSet traits,
+            RelOptTable table, CatalogReader catalogReader, RelNode child,
+            Operation operation, List<String> updateColumnList, boolean flattened) {
+        super(cluster, traits, table, catalogReader, child, operation, updateColumnList, flattened);
+    }
+
+    @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+      return new PhoenixTableModify(
+          getCluster(),
+          traitSet,
+          getTable(),
+          getCatalogReader(),
+          sole(inputs),
+          getOperation(),
+          getUpdateColumnList(),
+          isFlattened());
+    }
+
+    @Override
+    public StatementPlan implement(PhoenixRelImplementor implementor) {
+        if (getOperation() != Operation.INSERT) {
+            throw new UnsupportedOperationException();
+        }
+        
+        final QueryPlan queryPlan = implementor.visitInput(0, (PhoenixQueryRel) input);
+        final TableRef targetTableRef = getTable().unwrap(PhoenixTable.class).tableMapping.getTableRef();
+        return new MutationPlan() {
+            @Override
+            public ParameterMetaData getParameterMetaData() {
+                return queryPlan.getContext().getBindManager().getParameterMetaData();
+            }
+
+            @Override
+            public StatementContext getContext() {
+                return queryPlan.getContext();
+            }
+
+            @Override
+            public TableRef getTargetRef() {
+                return targetTableRef;
+            }
+
+            @Override
+            public Set<TableRef> getSourceRefs() {
+                // TODO return originalQueryPlan.getSourceRefs();
+                return queryPlan.getSourceRefs();
+            }
+
+            @Override
+            public org.apache.phoenix.jdbc.PhoenixStatement.Operation getOperation() {
+                return org.apache.phoenix.jdbc.PhoenixStatement.Operation.UPSERT;
+            }
+
+            @Override
+            public MutationState execute() throws SQLException {
+//                ResultIterator iterator = queryPlan.iterator();
+//                if (parallelIteratorFactory == null) {
+//                    return upsertSelect(new StatementContext(statement), tableRef, projector, iterator, columnIndexes, pkSlotIndexes, useServerTimestamp);
+//                }
+//                try {
+//                    parallelIteratorFactory.setRowProjector(projector);
+//                    parallelIteratorFactory.setColumnIndexes(columnIndexes);
+//                    parallelIteratorFactory.setPkSlotIndexes(pkSlotIndexes);
+//                    Tuple tuple;
+//                    long totalRowCount = 0;
+//                    StatementContext context = queryPlan.getContext();
+//                    while ((tuple=iterator.next()) != null) {// Runs query
+//                        Cell kv = tuple.getValue(0);
+//                        totalRowCount += PLong.INSTANCE.getCodec().decodeLong(kv.getValueArray(), kv.getValueOffset(), SortOrder.getDefault());
+//                    }
+//                    // Return total number of rows that have been updated. In the case of auto commit being off
+//                    // the mutations will all be in the mutation state of the current connection.
+//                    MutationState mutationState = new MutationState(maxSize, statement.getConnection(), totalRowCount);
+//                    /*
+//                     *  All the metrics collected for measuring the reads done by the parallel mutating iterators
+//                     *  is included in the ReadMetricHolder of the statement context. Include these metrics in the
+//                     *  returned mutation state so they can be published on commit. 
+//                     */
+//                    mutationState.setReadMetricQueue(context.getReadMetricsQueue());
+//                    return mutationState; 
+//                } finally {
+//                    iterator.close();
+//                }
+                return null;
+            }
+
+            @Override
+            public ExplainPlan getExplainPlan() throws SQLException {
+                List<String> queryPlanSteps =  queryPlan.getExplainPlan().getPlanSteps();
+                List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+                planSteps.add("UPSERT SELECT");
+                planSteps.addAll(queryPlanSteps);
+                return new ExplainPlan(planSteps);
+            }
+            
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
index e9324e2..b4d64b1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableScan.java
@@ -54,7 +54,7 @@ import com.google.common.collect.ImmutableList.Builder;
 /**
  * Scan of a Phoenix table.
  */
-public class PhoenixTableScan extends TableScan implements PhoenixRel {
+public class PhoenixTableScan extends TableScan implements PhoenixQueryRel {
     public enum ScanOrder {
         NONE,
         FORWARD,
@@ -123,7 +123,7 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel {
                 //    time when the correlate variable has not been defined yet.
                 // 2) get a guess of ScanRange even if the runtime value is absent.
                 // 3) test whether this dynamic filter is worth a recompile at runtime.
-                Implementor tmpImplementor = new PhoenixRelImplementorImpl(null) {                    
+                PhoenixRelImplementor tmpImplementor = new PhoenixRelImplementorImpl(null) {                    
                     @SuppressWarnings("rawtypes")
                     @Override
                     public Expression newFieldAccessExpression(String variableId, int index, PDataType type) {
@@ -244,7 +244,7 @@ public class PhoenixTableScan extends TableScan implements PhoenixRel {
     }
 
     @Override
-    public QueryPlan implement(Implementor implementor) {
+    public QueryPlan implement(PhoenixRelImplementor implementor) {
         final PhoenixTable phoenixTable = table.unwrap(PhoenixTable.class);
         TableMapping tableMapping = phoenixTable.tableMapping;
         implementor.setTableMapping(tableMapping);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTemporarySort.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTemporarySort.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTemporarySort.java
index 33aa797..596212f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTemporarySort.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTemporarySort.java
@@ -38,7 +38,7 @@ public class PhoenixTemporarySort extends PhoenixAbstractSort {
     }
 
     @Override
-    public QueryPlan implement(Implementor implementor) {
+    public QueryPlan implement(PhoenixRelImplementor implementor) {
         throw new UnsupportedOperationException();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
index 677baa5..5e750b1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
@@ -25,10 +25,11 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.phoenix.calcite.BuiltInMethod;
-import org.apache.phoenix.calcite.rel.PhoenixRel.ImplementorContext;
+import org.apache.phoenix.calcite.rel.PhoenixRelImplementor.ImplementorContext;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementPlan;
 import org.apache.phoenix.execute.DelegateQueryPlan;
 import org.apache.phoenix.execute.RuntimeContextImpl;
 import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
@@ -72,7 +73,7 @@ public class PhoenixToEnumerableConverter extends ConverterImpl implements Enume
         //   ResultIterator iterator = root.get("x");
         //   return CalciteRuntime.toEnumerable(iterator);
         final BlockBuilder list = new BlockBuilder();
-        QueryPlan plan = makePlan((PhoenixRel)getInput());
+        StatementPlan plan = makePlan((PhoenixRel)getInput());
         Expression var = stash(implementor, plan, QueryPlan.class);
         final RelDataType rowType = getRowType();
         final PhysType physType =
@@ -89,11 +90,15 @@ public class PhoenixToEnumerableConverter extends ConverterImpl implements Enume
         return implementor.result(physType, list.toBlock());
     }
     
-    static QueryPlan makePlan(PhoenixRel rel) {
-        final PhoenixRel.Implementor phoenixImplementor = new PhoenixRelImplementorImpl(new RuntimeContextImpl());
+    static StatementPlan makePlan(PhoenixRel rel) {
+        final PhoenixRelImplementor phoenixImplementor = new PhoenixRelImplementorImpl(new RuntimeContextImpl());
         phoenixImplementor.pushContext(new ImplementorContext(true, false, ImmutableIntList.identity(rel.getRowType().getFieldCount())));
-        final QueryPlan plan = phoenixImplementor.visitInput(0, rel);
-        return new DelegateQueryPlan(plan) {
+        final StatementPlan plan = rel.implement(phoenixImplementor);
+        if (!(plan instanceof QueryPlan)) {
+            return plan;
+        }
+            
+        return new DelegateQueryPlan((QueryPlan) plan) {
             @Override
             public ResultIterator iterator() throws SQLException {
                 return iterator(DefaultParallelScanGrouper.getInstance());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java
index f591fde..1e21054 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUncollect.java
@@ -21,7 +21,7 @@ import org.apache.phoenix.schema.types.PInteger;
 
 import com.google.common.collect.Lists;
 
-public class PhoenixUncollect extends Uncollect implements PhoenixRel {
+public class PhoenixUncollect extends Uncollect implements PhoenixQueryRel {
     
     public static PhoenixUncollect create(RelNode input, boolean withOrdinality) {
         RelOptCluster cluster = input.getCluster();
@@ -49,8 +49,8 @@ public class PhoenixUncollect extends Uncollect implements PhoenixRel {
     }
     
     @Override
-    public QueryPlan implement(Implementor implementor) {
-        QueryPlan plan = implementor.visitInput(0, (PhoenixRel) getInput());
+    public QueryPlan implement(PhoenixRelImplementor implementor) {
+        QueryPlan plan = implementor.visitInput(0, (PhoenixQueryRel) getInput());
         Expression arrayExpression = implementor.newColumnExpression(0);
         @SuppressWarnings("rawtypes")
         PDataType baseType = PDataType.fromTypeId(arrayExpression.getDataType().getSqlType() - PDataType.ARRAY_TYPE_BASE);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java
index 57393c8..675abc1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixUnion.java
@@ -22,7 +22,7 @@ import com.google.common.collect.Lists;
  * Implementation of {@link org.apache.calcite.rel.core.Union}
  * relational expression in Phoenix.
  */
-public class PhoenixUnion extends Union implements PhoenixRel {
+public class PhoenixUnion extends Union implements PhoenixQueryRel {
     
     public static PhoenixUnion create(List<RelNode> inputs, boolean all) {
         RelOptCluster cluster = inputs.get(0).getCluster();
@@ -52,10 +52,10 @@ public class PhoenixUnion extends Union implements PhoenixRel {
     }
 
     @Override
-    public QueryPlan implement(Implementor implementor) {
+    public QueryPlan implement(PhoenixRelImplementor implementor) {
         List<QueryPlan> subPlans = Lists.newArrayListWithExpectedSize(inputs.size());
         for (Ord<RelNode> input : Ord.zip(inputs)) {
-            subPlans.add(implementor.visitInput(input.i, (PhoenixRel) input.e));
+            subPlans.add(implementor.visitInput(input.i, (PhoenixQueryRel) input.e));
         }
         
         return new UnionPlan(subPlans.get(0).getContext(), SelectStatement.SELECT_ONE, subPlans.get(0).getTableRef(), RowProjector.EMPTY_PROJECTOR,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java
index c0fd272..886e861 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixValues.java
@@ -55,7 +55,7 @@ import com.google.common.collect.Lists;
  * Implementation of {@link org.apache.calcite.rel.core.Values}
  * relational expression in Phoenix.
  */
-public class PhoenixValues extends Values implements PhoenixRel {
+public class PhoenixValues extends Values implements PhoenixQueryRel {
     
     private static final PhoenixConnection phoenixConnection;
     static {
@@ -107,7 +107,7 @@ public class PhoenixValues extends Values implements PhoenixRel {
     }
 
     @Override
-    public QueryPlan implement(Implementor implementor) {
+    public QueryPlan implement(PhoenixRelImplementor implementor) {
         List<Tuple> literalResult = Lists.newArrayList();
         Iterator<ImmutableList<RexLiteral>> iter = getTuples().iterator();
         Tuple baseTuple = new SingleKeyValueTuple(KeyValue.LOWESTKEY);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/3425347e/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
index 396974d..c42df99 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rules/PhoenixConverterRules.java
@@ -32,10 +32,12 @@ import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.logical.LogicalSort;
+import org.apache.calcite.rel.logical.LogicalTableModify;
 import org.apache.calcite.rel.logical.LogicalUnion;
 import org.apache.calcite.rel.logical.LogicalValues;
 import org.apache.calcite.rex.RexNode;
 import org.apache.phoenix.calcite.CalciteUtils;
+import org.apache.phoenix.calcite.PhoenixTable;
 import org.apache.phoenix.calcite.rel.PhoenixAbstractAggregate;
 import org.apache.phoenix.calcite.rel.PhoenixClientAggregate;
 import org.apache.phoenix.calcite.rel.PhoenixClientJoin;
@@ -51,6 +53,7 @@ import org.apache.phoenix.calcite.rel.PhoenixServerJoin;
 import org.apache.phoenix.calcite.rel.PhoenixServerProject;
 import org.apache.phoenix.calcite.rel.PhoenixServerSemiJoin;
 import org.apache.phoenix.calcite.rel.PhoenixServerSort;
+import org.apache.phoenix.calcite.rel.PhoenixTableModify;
 import org.apache.phoenix.calcite.rel.PhoenixToEnumerableConverter;
 import org.apache.phoenix.calcite.rel.PhoenixUncollect;
 import org.apache.phoenix.calcite.rel.PhoenixUnion;
@@ -90,6 +93,7 @@ public class PhoenixConverterRules {
         PhoenixValuesRule.INSTANCE,
         PhoenixUncollectRule.INSTANCE,
         PhoenixCorrelateRule.INSTANCE,
+        PhoenixTableModifyRule.INSTANCE,
     };
 
     public static final RelOptRule[] CONVERTIBLE_RULES = {
@@ -114,6 +118,7 @@ public class PhoenixConverterRules {
         PhoenixValuesRule.INSTANCE,
         PhoenixUncollectRule.INSTANCE,
         PhoenixCorrelateRule.INSTANCE,
+        PhoenixTableModifyRule.INSTANCE,
     };
 
     /** Base class for planner rules that convert a relational expression to
@@ -816,6 +821,40 @@ public class PhoenixConverterRules {
     
 
     /**
+     * Rule to convert a {@link org.apache.calcite.rel.logical.LogicalTableModify} to a
+     * {@link PhoenixTableModify}.
+     */
+    public static class PhoenixTableModifyRule extends PhoenixConverterRule {
+        
+        private static final PhoenixTableModifyRule INSTANCE = new PhoenixTableModifyRule();
+
+        private PhoenixTableModifyRule() {
+            super(LogicalTableModify.class, Convention.NONE, 
+                    PhoenixConvention.CLIENT, "PhoenixTableModifyRule");
+        }
+
+        public RelNode convert(RelNode rel) {
+            final LogicalTableModify modify = (LogicalTableModify) rel;
+            final PhoenixTable phoenixTable = modify.getTable().unwrap(PhoenixTable.class);
+            if (phoenixTable == null) {
+                return null;
+            }
+            
+            return new PhoenixTableModify(
+                    modify.getCluster(),
+                    modify.getTraitSet().replace(PhoenixConvention.CLIENT),
+                    modify.getTable(),
+                    modify.getCatalogReader(),
+                    convert(
+                            modify.getInput(),
+                            modify.getTraitSet().replace(PhoenixConvention.GENERIC)),
+                    modify.getOperation(),
+                    modify.getUpdateColumnList(),
+                    modify.isFlattened());
+        }
+    }
+
+    /**
      * Rule to convert a relational expression from
      * {@link org.apache.phoenix.calcite.rel.PhoenixConvention} to
      * {@link org.apache.calcite.adapter.enumerable.EnumerableConvention}.