You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/02/12 09:16:45 UTC

[2/2] git commit: PHOENIX-40 Add support for batching (JamesTaylor)

PHOENIX-40 Add support for batching (JamesTaylor)


Project: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/commit/0b3d9487
Tree: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/tree/0b3d9487
Diff: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/diff/0b3d9487

Branch: refs/heads/master
Commit: 0b3d9487606c5ba0c03865646a8f59c2bb793419
Parents: a8e4dd7
Author: James Taylor <ja...@apache.org>
Authored: Wed Feb 12 00:16:35 2014 -0800
Committer: James Taylor <ja...@apache.org>
Committed: Wed Feb 12 00:16:35 2014 -0800

----------------------------------------------------------------------
 .../phoenix/exception/BatchUpdateExecution.java |  38 +
 .../phoenix/exception/SQLExceptionCode.java     |   2 +
 .../apache/phoenix/execute/BasicQueryPlan.java  |   8 +-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |   6 +-
 .../phoenix/jdbc/PhoenixPreparedStatement.java  |  71 +-
 .../apache/phoenix/jdbc/PhoenixStatement.java   | 975 ++++++++-----------
 .../apache/phoenix/optimize/QueryOptimizer.java |  17 +-
 .../apache/phoenix/parse/BindableStatement.java |   3 +
 .../phoenix/parse/CreateSequenceStatement.java  |   2 +-
 .../phoenix/parse/CreateTableStatement.java     |   6 +-
 .../apache/phoenix/parse/DeleteStatement.java   |   6 +
 .../phoenix/parse/DropIndexStatement.java       |   8 +-
 .../phoenix/parse/DropSequenceStatement.java    |   9 +-
 .../phoenix/parse/DropTableStatement.java       |   8 +-
 .../apache/phoenix/parse/ExplainStatement.java  |   7 +
 .../apache/phoenix/parse/MutableStatement.java  |  31 +
 .../apache/phoenix/parse/SelectStatement.java   |   6 +
 .../phoenix/parse/SingleTableSQLStatement.java  |   2 +-
 .../phoenix/compile/QueryCompilerTest.java      |  40 +
 .../phoenix/end2end/UpsertValuesTest.java       |  91 +-
 20 files changed, 720 insertions(+), 616 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/0b3d9487/phoenix-core/src/main/java/org/apache/phoenix/exception/BatchUpdateExecution.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/BatchUpdateExecution.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/BatchUpdateExecution.java
new file mode 100644
index 0000000..8bd4d3b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/BatchUpdateExecution.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.exception;
+
+import java.sql.SQLException;
+
+public class BatchUpdateExecution extends SQLException {
+    private static final long serialVersionUID = 1L;
+    private static SQLExceptionCode code = SQLExceptionCode.BATCH_EXCEPTION;
+    private final int batchIndex;
+
+    public BatchUpdateExecution(Throwable cause, int batchIndex) {
+        super(new SQLExceptionInfo.Builder(code).build().toString(),
+                code.getSQLState(), code.getErrorCode(), cause);
+        this.batchIndex = batchIndex;
+    }
+
+    public int getBatchIndex() {
+        return batchIndex;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/0b3d9487/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index a3fd5a8..1d01c14 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -247,6 +247,8 @@ public enum SQLExceptionCode {
     EXECUTE_QUERY_NOT_APPLICABLE(1103, "XCL03", "executeQuery may not be used."),
     EXECUTE_UPDATE_NOT_APPLICABLE(1104, "XCL03", "executeUpdate may not be used."),
     SPLIT_POINT_NOT_CONSTANT(1105, "XCL04", "Split points must be constants."),
+    BATCH_EXCEPTION(1106, "XCL05", "Exception while executing batch."),
+    EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH(1107, "XCL06", "An executeUpdate is prohibited when the batch is not empty. Use clearBatch to empty the batch first."),
     
     /**
      * Implementation defined class. Phoenix internal error. (errorcode 20, sqlstate INT).

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/0b3d9487/phoenix-core/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java
index f866322..8305ac4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BasicQueryPlan.java
@@ -26,8 +26,6 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
-import com.google.common.collect.Lists;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
@@ -46,6 +44,8 @@ import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
 import org.apache.phoenix.util.ScanUtil;
 
+import com.google.common.collect.Lists;
+
 
 
 /**
@@ -189,7 +189,9 @@ public abstract class BasicQueryPlan implements QueryPlan {
             return new ExplainPlan(Collections.singletonList("DEGENERATE SCAN OVER " + tableRef.getTable().getName().getString()));
         }
         
-        ResultIterator iterator = iterator();
+        // Optimize here when getting explain plan, as queries don't get optimized until after compilation
+        QueryPlan plan = context.getConnection().getQueryServices().getOptimizer().optimize(this, context.getStatement());
+        ResultIterator iterator = plan.iterator();
         List<String> planSteps = Lists.newArrayListWithExpectedSize(5);
         iterator.explain(planSteps);
         return new ExplainPlan(planSteps);

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/0b3d9487/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 4cb62fd..572a3c8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -51,12 +51,12 @@ import org.apache.phoenix.iterate.MaterializedResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.PDatum;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.RowKeyValueAccessor;
+import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ByteUtil;
@@ -1101,7 +1101,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
 
     @Override
     public boolean supportsBatchUpdates() throws SQLException {
-        return false; // FIXME?
+        return true;
     }
 
     @Override
@@ -1287,7 +1287,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho
 
     @Override
     public boolean supportsOuterJoins() throws SQLException {
-        return false;
+        return true;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/0b3d9487/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
index b6b97ed..de3f7d4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixPreparedStatement.java
@@ -41,6 +41,7 @@ import java.sql.SQLFeatureNotSupportedException;
 import java.sql.SQLXML;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Calendar;
@@ -71,7 +72,7 @@ import org.apache.phoenix.util.SQLCloseable;
  */
 public class PhoenixPreparedStatement extends PhoenixStatement implements PreparedStatement, SQLCloseable {
     private final List<Object> parameters;
-    private final ExecutableStatement statement;
+    private final CompilableStatement statement;
 
     private final String query;
 
@@ -93,11 +94,20 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar
         Collections.fill(parameters, BindManager.UNBOUND_PARAMETER);
     }
 
+    public PhoenixPreparedStatement(PhoenixPreparedStatement statement) throws SQLException {
+        super(statement.connection);
+        this.query = statement.query;
+        this.statement = statement.statement;
+        this.parameters = new ArrayList<Object>(statement.parameters);
+    }
+
     @Override
     public void addBatch() throws SQLException {
-        throw new SQLFeatureNotSupportedException();
+        throwIfUnboundParameters();
+        batch.add(new PhoenixPreparedStatement(this));
     }
 
+
     @Override
     public void clearParameters() throws SQLException {
         Collections.fill(parameters, BindManager.UNBOUND_PARAMETER);
@@ -119,58 +129,47 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar
         }
     }
     
-    @Override
-    public boolean execute() throws SQLException {
+    
+    boolean execute(boolean batched) throws SQLException {
         throwIfUnboundParameters();
-        try {
-            return statement.execute();
-        } catch (RuntimeException e) {
-            // FIXME: Expression.evaluate does not throw SQLException
-            // so this will unwrap throws from that.
-            if (e.getCause() instanceof SQLException) {
-                throw (SQLException) e.getCause();
-            }
-            throw e;
+        if (!batched && statement.getOperation().isMutation() && !batch.isEmpty()) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH)
+            .build().buildException();
         }
+        return execute(statement);
+    }
+
+    @Override
+    public boolean execute() throws SQLException {
+        return execute(false);
     }
 
     @Override
     public ResultSet executeQuery() throws SQLException {
         throwIfUnboundParameters();
-        try {
-            return statement.executeQuery();
-        } catch (RuntimeException e) {
-            // FIXME: Expression.evaluate does not throw SQLException
-            // so this will unwrap throws from that.
-            if (e.getCause() instanceof SQLException) {
-                throw (SQLException) e.getCause();
-            }
-            throw e;
-        }
+        return executeQuery(statement);
     }
 
     @Override
     public int executeUpdate() throws SQLException {
         throwIfUnboundParameters();
-        try {
-            return statement.executeUpdate();
-        } catch (RuntimeException e) {
-            // FIXME: Expression.evaluate does not throw SQLException
-            // so this will unwrap throws from that.
-            if (e.getCause() instanceof SQLException) {
-                throw (SQLException) e.getCause();
-            }
-            throw e;
+        if (!batch.isEmpty()) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH)
+            .build().buildException();
         }
+        return executeMutation(statement);
     }
 
     public QueryPlan optimizeQuery() throws SQLException {
         throwIfUnboundParameters();
-        return (QueryPlan)statement.optimizePlan();
+        return optimizeQuery(statement);
     }
 
     @Override
     public ResultSetMetaData getMetaData() throws SQLException {
+        if (statement.getOperation().isMutation()) {
+            return null;
+        }
         int paramCount = statement.getBindCount();
         List<Object> params = this.getParameters();
         BitSet unsetParams = new BitSet(statement.getBindCount());
@@ -181,7 +180,9 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar
             }
         }
         try {
-            return statement.getResultSetMetaData();
+            // Just compile top level query without optimizing to get ResultSetMetaData
+            QueryPlan plan = (QueryPlan)statement.compilePlan(this);
+            return new PhoenixResultSetMetaData(this.getConnection(), plan.getProjector());
         } finally {
             int lastSetBit = 0;
             while ((lastSetBit = unsetParams.nextSetBit(lastSetBit)) != -1) {
@@ -203,7 +204,7 @@ public class PhoenixPreparedStatement extends PhoenixStatement implements Prepar
             }
         }
         try {
-            StatementPlan plan = statement.compilePlan();
+            StatementPlan plan = statement.compilePlan(this);
             return plan.getParameterMetaData();
         } finally {
             int lastSetBit = 0;

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/0b3d9487/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index f7bcf3f..aa51951 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.io.Reader;
 import java.sql.ParameterMetaData;
 import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.sql.SQLWarning;
@@ -36,9 +35,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.hbase.util.Pair;
-
-import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Lists;
 import org.apache.phoenix.compile.ColumnProjector;
 import org.apache.phoenix.compile.CreateIndexCompiler;
 import org.apache.phoenix.compile.CreateSequenceCompiler;
@@ -47,13 +43,17 @@ import org.apache.phoenix.compile.DeleteCompiler;
 import org.apache.phoenix.compile.DropSequenceCompiler;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.ExpressionProjector;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.MutationPlan;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.QueryCompiler;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.compile.StatementPlan;
 import org.apache.phoenix.compile.UpsertCompiler;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.exception.BatchUpdateExecution;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.MutationState;
@@ -75,6 +75,7 @@ import org.apache.phoenix.parse.DropIndexStatement;
 import org.apache.phoenix.parse.DropSequenceStatement;
 import org.apache.phoenix.parse.DropTableStatement;
 import org.apache.phoenix.parse.ExplainStatement;
+import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.parse.LimitNode;
 import org.apache.phoenix.parse.NamedNode;
@@ -88,10 +89,10 @@ import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.parse.TableName;
 import org.apache.phoenix.parse.TableNode;
 import org.apache.phoenix.parse.UpsertStatement;
+import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.ExecuteQueryNotApplicableException;
 import org.apache.phoenix.schema.ExecuteUpdateNotApplicableException;
 import org.apache.phoenix.schema.MetaDataClient;
@@ -100,6 +101,8 @@ import org.apache.phoenix.schema.PDatum;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.RowKeyValueAccessor;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ByteUtil;
@@ -108,6 +111,9 @@ import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
 import org.apache.phoenix.util.ServerUtil;
 
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+
 
 /**
  * 
@@ -128,20 +134,27 @@ import org.apache.phoenix.util.ServerUtil;
  * @since 0.1
  */
 public class PhoenixStatement implements Statement, SQLCloseable, org.apache.phoenix.jdbc.Jdbc7Shim.Statement {
-    public enum UpdateOperation {
-        DELETED("deleted"),
-        UPSERTED("upserted");
+    public enum Operation {
+        QUERY("queried", false),
+        DELETE("deleted", true),
+        UPSERT("upserted", true);
         
         private final String toString;
-        UpdateOperation(String toString) {
+        private final boolean isMutation;
+        Operation(String toString, boolean isMutation) {
             this.toString = toString;
+            this.isMutation = isMutation;
+        }
+        
+        public boolean isMutation() {
+            return isMutation;
         }
         
         @Override
         public String toString() {
             return toString;
         }
-        };
+    };
 
     protected final PhoenixConnection connection;
     private static final int NO_UPDATE = -1;
@@ -149,9 +162,8 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
     private QueryPlan lastQueryPlan;
     private PhoenixResultSet lastResultSet;
     private int lastUpdateCount = NO_UPDATE;
-    private UpdateOperation lastUpdateOperation;
+    private Operation lastUpdateOperation;
     private boolean isClosed = false;
-    private ResultSetMetaData resultSetMetaData;
     private int maxRows;
     
     
@@ -164,387 +176,315 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
     }
     
     protected PhoenixResultSet newResultSet(ResultIterator iterator, RowProjector projector) throws SQLException {
-        return new PhoenixResultSet(iterator, projector, PhoenixStatement.this);
+        return new PhoenixResultSet(iterator, projector, this);
     }
     
-    protected static interface ExecutableStatement extends BindableStatement {
-        public boolean execute() throws SQLException;
-        public int executeUpdate() throws SQLException;
-        public PhoenixResultSet executeQuery() throws SQLException;
-        public ResultSetMetaData getResultSetMetaData() throws SQLException;
-        public StatementPlan optimizePlan() throws SQLException;
-        public StatementPlan compilePlan() throws SQLException;
+    protected boolean execute(CompilableStatement stmt) throws SQLException {
+        if (stmt.getOperation().isMutation()) {
+            executeMutation(stmt);
+            return false;
+        }
+        executeQuery(stmt);
+        return true;
     }
     
-    protected static interface MutatableStatement extends ExecutableStatement {
-        @Override
-        public MutationPlan optimizePlan() throws SQLException;
+    protected QueryPlan optimizeQuery(CompilableStatement stmt) throws SQLException {
+        QueryPlan plan = stmt.compilePlan(this);
+        return connection.getQueryServices().getOptimizer().optimize(plan, this);
     }
     
-    private class ExecutableSelectStatement extends SelectStatement implements ExecutableStatement {
-        private ExecutableSelectStatement(List<? extends TableNode> from, HintNode hint, boolean isDistinct, List<AliasedNode> select, ParseNode where,
-                List<ParseNode> groupBy, ParseNode having, List<OrderByNode> orderBy, LimitNode limit, int bindCount, boolean isAggregate) {
-            super(from, hint, isDistinct, select, where, groupBy, having, orderBy, limit, bindCount, isAggregate);
-        }
-
-        @Override
-        public PhoenixResultSet executeQuery() throws SQLException {
-            QueryPlan plan = optimizePlan();
+    protected PhoenixResultSet executeQuery(CompilableStatement stmt) throws SQLException {
+        try {
+            QueryPlan plan = stmt.compilePlan(this);
+            plan = connection.getQueryServices().getOptimizer().optimize(plan, this);
             PhoenixResultSet rs = newResultSet(plan.iterator(), plan.getProjector());
             resultSets.add(rs);
-            lastResultSet = rs;
-            lastUpdateCount = NO_UPDATE;
-            lastUpdateOperation = null;
+            setLastQueryPlan(plan);
+            setLastResultSet(rs);
+            setLastUpdateCount(NO_UPDATE);
+            setLastUpdateOperation(stmt.getOperation());
             return rs;
-        }
-
-        @Override
-        public boolean execute() throws SQLException {
-            executeQuery();
-            return true;
-        }
-
-        @Override
-        public int executeUpdate() throws SQLException {
-            throw new ExecuteUpdateNotApplicableException(this.toString());
-        }
-
-        @Override
-        public QueryPlan optimizePlan() throws SQLException {
-            return lastQueryPlan = connection.getQueryServices().getOptimizer().optimize(this, PhoenixStatement.this);
-        }
-        
-        @Override
-        public StatementPlan compilePlan() throws SQLException {
-            return new QueryCompiler(PhoenixStatement.this).compile(this);
-        }
-        
-        @Override
-        public ResultSetMetaData getResultSetMetaData() throws SQLException {
-            if (resultSetMetaData == null) {
-                // Just compile top level query without optimizing to get ResultSetMetaData
-                QueryPlan plan = new QueryCompiler(PhoenixStatement.this).compile(this);
-                resultSetMetaData = new PhoenixResultSetMetaData(connection, plan.getProjector());
+        } catch (RuntimeException e) {
+            // FIXME: Expression.evaluate does not throw SQLException
+            // so this will unwrap throws from that.
+            if (e.getCause() instanceof SQLException) {
+                throw (SQLException) e.getCause();
             }
-            return resultSetMetaData;
+            throw e;
         }
     }
     
-    private int executeMutation(MutationPlan plan) throws SQLException {
+    protected int executeMutation(CompilableStatement stmt) throws SQLException {
         // Note that the upsert select statements will need to commit any open transaction here,
         // since they'd update data directly from coprocessors, and should thus operate on
         // the latest state
-        MutationState state = plan.execute();
-        connection.getMutationState().join(state);
-        if (connection.getAutoCommit()) {
-            connection.commit();
-        }
-        lastResultSet = null;
-        lastQueryPlan = null;
-        // Unfortunately, JDBC uses an int for update count, so we
-        // just max out at Integer.MAX_VALUE
-        long updateCount = state.getUpdateCount();
-        lastUpdateCount = (int)Math.min(Integer.MAX_VALUE, updateCount);
-        return lastUpdateCount;
+        try {
+            MutationPlan plan = stmt.compilePlan(this);
+            MutationState state = plan.execute();
+            connection.getMutationState().join(state);
+            if (connection.getAutoCommit()) {
+                connection.commit();
+            }
+            setLastResultSet(null);
+            setLastQueryPlan(null);
+            // Unfortunately, JDBC uses an int for update count, so we
+            // just max out at Integer.MAX_VALUE
+            int lastUpdateCount = (int)Math.min(Integer.MAX_VALUE, state.getUpdateCount());
+            setLastUpdateCount(lastUpdateCount);
+            setLastUpdateOperation(stmt.getOperation());
+            return lastUpdateCount;
+        } catch (RuntimeException e) {
+            // FIXME: Expression.evaluate does not throw SQLException
+            // so this will unwrap throws from that.
+            if (e.getCause() instanceof SQLException) {
+                throw (SQLException) e.getCause();
+            }
+            throw e;
+        }
     }
     
-    private class ExecutableUpsertStatement extends UpsertStatement implements MutatableStatement {
-        private ExecutableUpsertStatement(NamedTableNode table, HintNode hintNode, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount) {
-            super(table, hintNode, columns, values, select, bindCount);
+    protected static interface CompilableStatement extends BindableStatement {
+        public <T extends StatementPlan> T compilePlan (PhoenixStatement stmt) throws SQLException;
+    }
+    
+    private static class ExecutableSelectStatement extends SelectStatement implements CompilableStatement {
+        private ExecutableSelectStatement(List<? extends TableNode> from, HintNode hint, boolean isDistinct, List<AliasedNode> select, ParseNode where,
+                List<ParseNode> groupBy, ParseNode having, List<OrderByNode> orderBy, LimitNode limit, int bindCount, boolean isAggregate) {
+            super(from, hint, isDistinct, select, where, groupBy, having, orderBy, limit, bindCount, isAggregate);
         }
 
+        @SuppressWarnings("unchecked")
         @Override
-        public PhoenixResultSet executeQuery() throws SQLException {
-            throw new ExecuteQueryNotApplicableException("upsert", this.toString());
+        public QueryPlan compilePlan(PhoenixStatement stmt) throws SQLException {
+            return new QueryCompiler(stmt).compile(this);
         }
-
+    }
+    
+    private static final byte[] EXPLAIN_PLAN_FAMILY = QueryConstants.SINGLE_COLUMN_FAMILY;
+    private static final byte[] EXPLAIN_PLAN_COLUMN = PDataType.VARCHAR.toBytes("Plan");
+    private static final String EXPLAIN_PLAN_ALIAS = "PLAN";
+    private static final String EXPLAIN_PLAN_TABLE_NAME = "PLAN_TABLE";
+    private static final PDatum EXPLAIN_PLAN_DATUM = new PDatum() {
         @Override
-        public boolean execute() throws SQLException {
-            executeUpdate();
+        public boolean isNullable() {
             return false;
         }
-
         @Override
-        public int executeUpdate() throws SQLException {
-            lastUpdateOperation = UpdateOperation.UPSERTED;
-            return executeMutation(optimizePlan());
+        public PDataType getDataType() {
+            return PDataType.VARCHAR;
         }
-
         @Override
-        public ResultSetMetaData getResultSetMetaData() throws SQLException {
+        public Integer getByteSize() {
             return null;
         }
-
         @Override
-        public MutationPlan compilePlan() throws SQLException {
-            UpsertCompiler compiler = new UpsertCompiler(PhoenixStatement.this);
-            return compiler.compile(this);
+        public Integer getMaxLength() {
+            return null;
         }
-        
         @Override
-        public MutationPlan optimizePlan() throws SQLException {
-            return compilePlan();
+        public Integer getScale() {
+            return null;
         }
-    }
-    
-    private class ExecutableDeleteStatement extends DeleteStatement implements MutatableStatement {
-        private ExecutableDeleteStatement(NamedTableNode table, HintNode hint, ParseNode whereNode, List<OrderByNode> orderBy, LimitNode limit, int bindCount) {
-            super(table, hint, whereNode, orderBy, limit, bindCount);
+        @Override
+        public SortOrder getSortOrder() {
+            return SortOrder.getDefault();
         }
+    };
 
-        @Override
-        public PhoenixResultSet executeQuery() throws SQLException {
-            throw new ExecuteQueryNotApplicableException("delete", this.toString());
+    private static final RowProjector EXPLAIN_PLAN_ROW_PROJECTOR = new RowProjector(Arrays.<ColumnProjector>asList(
+            new ExpressionProjector(EXPLAIN_PLAN_ALIAS, EXPLAIN_PLAN_TABLE_NAME, 
+                    new RowKeyColumnExpression(EXPLAIN_PLAN_DATUM,
+                            new RowKeyValueAccessor(Collections.<PDatum>singletonList(EXPLAIN_PLAN_DATUM), 0)), false)
+            ), 0, true);
+    private static class ExecutableExplainStatement extends ExplainStatement implements CompilableStatement {
+
+        public ExecutableExplainStatement(BindableStatement statement) {
+            super(statement);
         }
 
         @Override
-        public boolean execute() throws SQLException {
-            executeUpdate();
-            return false;
+        public CompilableStatement getStatement() {
+            return (CompilableStatement) super.getStatement();
         }
-
+        
         @Override
-        public int executeUpdate() throws SQLException {
-            lastUpdateOperation = UpdateOperation.DELETED;
-            return executeMutation(optimizePlan());
+        public int getBindCount() {
+            return getStatement().getBindCount();
         }
 
+        @SuppressWarnings("unchecked")
         @Override
-        public ResultSetMetaData getResultSetMetaData() throws SQLException {
-            return null;
+        public QueryPlan compilePlan(PhoenixStatement stmt) throws SQLException {
+            CompilableStatement compilableStmt = getStatement();
+            StatementPlan plan = compilableStmt.compilePlan(stmt);
+            List<String> planSteps = plan.getExplainPlan().getPlanSteps();
+            List<Tuple> tuples = Lists.newArrayListWithExpectedSize(planSteps.size());
+            for (String planStep : planSteps) {
+                Tuple tuple = new SingleKeyValueTuple(KeyValueUtil.newKeyValue(PDataType.VARCHAR.toBytes(planStep), EXPLAIN_PLAN_FAMILY, EXPLAIN_PLAN_COLUMN, MetaDataProtocol.MIN_TABLE_TIMESTAMP, ByteUtil.EMPTY_BYTE_ARRAY));
+                tuples.add(tuple);
+            }
+            final ResultIterator iterator = new MaterializedResultIterator(tuples);
+            return new QueryPlan() {
+
+                @Override
+                public ParameterMetaData getParameterMetaData() {
+                    return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA;
+                }
+
+                @Override
+                public ExplainPlan getExplainPlan() throws SQLException {
+                    return new ExplainPlan(Collections.singletonList("EXPLAIN PLAN"));
+                }
+
+                @Override
+                public ResultIterator iterator() throws SQLException {
+                    return iterator;
+                }
+
+                @Override
+                public long getEstimatedSize() {
+                    return 0;
+                }
+
+                @Override
+                public TableRef getTableRef() {
+                    return null;
+                }
+
+                @Override
+                public RowProjector getProjector() {
+                    return EXPLAIN_PLAN_ROW_PROJECTOR;
+                }
+
+                @Override
+                public Integer getLimit() {
+                    return null;
+                }
+
+                @Override
+                public OrderBy getOrderBy() {
+                    return OrderBy.EMPTY_ORDER_BY;
+                }
+
+                @Override
+                public GroupBy getGroupBy() {
+                    return GroupBy.EMPTY_GROUP_BY;
+                }
+
+                @Override
+                public List<KeyRange> getSplits() {
+                    return Collections.emptyList();
+                }
+
+                @Override
+                public StatementContext getContext() {
+                    return null;
+                }
+
+                @Override
+                public FilterableStatement getStatement() {
+                    return null;
+                }
+                
+            };
         }
+    }
 
+    private static class ExecutableUpsertStatement extends UpsertStatement implements CompilableStatement {
+        private ExecutableUpsertStatement(NamedTableNode table, HintNode hintNode, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount) {
+            super(table, hintNode, columns, values, select, bindCount);
+        }
+
+        @SuppressWarnings("unchecked")
         @Override
-        public MutationPlan compilePlan() throws SQLException {
-            DeleteCompiler compiler = new DeleteCompiler(PhoenixStatement.this);
+        public MutationPlan compilePlan(PhoenixStatement stmt) throws SQLException {
+            UpsertCompiler compiler = new UpsertCompiler(stmt);
             return compiler.compile(this);
         }
-        
+    }
+    
+    private static class ExecutableDeleteStatement extends DeleteStatement implements CompilableStatement {
+        private ExecutableDeleteStatement(NamedTableNode table, HintNode hint, ParseNode whereNode, List<OrderByNode> orderBy, LimitNode limit, int bindCount) {
+            super(table, hint, whereNode, orderBy, limit, bindCount);
+        }
+
+        @SuppressWarnings("unchecked")
         @Override
-        public MutationPlan optimizePlan() throws SQLException {
-            return compilePlan();
+        public MutationPlan compilePlan(PhoenixStatement stmt) throws SQLException {
+            DeleteCompiler compiler = new DeleteCompiler(stmt);
+            return compiler.compile(this);
         }
     }
     
-    private class ExecutableCreateTableStatement extends CreateTableStatement implements ExecutableStatement {
+    private static class ExecutableCreateTableStatement extends CreateTableStatement implements CompilableStatement {
         ExecutableCreateTableStatement(TableName tableName, ListMultimap<String,Pair<String,Object>> props, List<ColumnDef> columnDefs,
                 PrimaryKeyConstraint pkConstraint, List<ParseNode> splitNodes, PTableType tableType, boolean ifNotExists,
                 TableName baseTableName, ParseNode tableTypeIdNode, int bindCount) {
             super(tableName, props, columnDefs, pkConstraint, splitNodes, tableType, ifNotExists, baseTableName, tableTypeIdNode, bindCount);
         }
 
+        @SuppressWarnings("unchecked")
         @Override
-        public PhoenixResultSet executeQuery() throws SQLException {
-            throw new ExecuteQueryNotApplicableException("CREATE TABLE", this.toString());
-        }
-
-        @Override
-        public boolean execute() throws SQLException {
-            executeUpdate();
-            return false;
-        }
-
-        @Override
-        public int executeUpdate() throws SQLException {
-            MutationPlan plan = optimizePlan();
-            MutationState state = plan.execute();
-            lastQueryPlan = null;
-            lastResultSet = null;
-            lastUpdateCount = (int)Math.min(state.getUpdateCount(), Integer.MAX_VALUE);
-            lastUpdateOperation = UpdateOperation.UPSERTED;
-            return lastUpdateCount;
-        }
-
-        @Override
-        public ResultSetMetaData getResultSetMetaData() throws SQLException {
-            return null;
-        }
-
-        @Override
-        public MutationPlan compilePlan() throws SQLException {
-            CreateTableCompiler compiler = new CreateTableCompiler(PhoenixStatement.this);
+        public MutationPlan compilePlan(PhoenixStatement stmt) throws SQLException {
+            CreateTableCompiler compiler = new CreateTableCompiler(stmt);
             return compiler.compile(this);
         }
-        
-        @Override
-        public MutationPlan optimizePlan() throws SQLException {
-            return compilePlan();
-        }
     }
 
-    private class ExecutableCreateIndexStatement extends CreateIndexStatement implements ExecutableStatement {
+    private static class ExecutableCreateIndexStatement extends CreateIndexStatement implements CompilableStatement {
 
         public ExecutableCreateIndexStatement(NamedNode indexName, NamedTableNode dataTable, PrimaryKeyConstraint pkConstraint, List<ColumnName> includeColumns, List<ParseNode> splits,
                 ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, int bindCount) {
             super(indexName, dataTable, pkConstraint, includeColumns, splits, props, ifNotExists, bindCount);
         }
 
+        @SuppressWarnings("unchecked")
         @Override
-        public PhoenixResultSet executeQuery() throws SQLException {
-            throw new ExecuteQueryNotApplicableException("CREATE INDEX", this.toString());
-        }
-
-        @Override
-        public boolean execute() throws SQLException {
-            executeUpdate();
-            return false;
-        }
-
-        @Override
-        public int executeUpdate() throws SQLException {
-            MutationPlan plan = optimizePlan();
-            MutationState state = plan.execute();
-            lastQueryPlan = null;
-            lastResultSet = null;
-            lastUpdateCount = (int)Math.min(state.getUpdateCount(), Integer.MAX_VALUE);
-            lastUpdateOperation = UpdateOperation.UPSERTED;
-            return lastUpdateCount;
-        }
-
-        @Override
-        public ResultSetMetaData getResultSetMetaData() throws SQLException {
-            return null;
-        }
-
-        @Override
-        public MutationPlan compilePlan() throws SQLException {
-            CreateIndexCompiler compiler = new CreateIndexCompiler(PhoenixStatement.this);
+        public MutationPlan compilePlan(PhoenixStatement stmt) throws SQLException {
+            CreateIndexCompiler compiler = new CreateIndexCompiler(stmt);
             return compiler.compile(this);
         }
-        
-        @Override
-        public MutationPlan optimizePlan() throws SQLException {
-            return compilePlan();
-        }
     }
     
-    private class ExecutableCreateSequenceStatement extends	CreateSequenceStatement implements ExecutableStatement {
+    private static class ExecutableCreateSequenceStatement extends	CreateSequenceStatement implements CompilableStatement {
 
 		public ExecutableCreateSequenceStatement(TableName sequenceName, ParseNode startWith, ParseNode incrementBy, ParseNode cacheSize, boolean ifNotExists, int bindCount) {
 			super(sequenceName, startWith, incrementBy, cacheSize, ifNotExists, bindCount);
 		}
 
-		@Override
-		public PhoenixResultSet executeQuery() throws SQLException {
-			throw new ExecuteQueryNotApplicableException("CREATE SEQUENCE",	this.toString());
-		}
-
-		@Override
-		public boolean execute() throws SQLException {
-		    executeUpdate();
-		    return false;
-		}
-
-		@Override
-		public int executeUpdate() throws SQLException {
-            MutationPlan plan = optimizePlan();
-            MutationState state = plan.execute();
-            lastQueryPlan = null;
-            lastResultSet = null;
-            lastUpdateCount = (int)Math.min(state.getUpdateCount(), Integer.MAX_VALUE);
-            lastUpdateOperation = UpdateOperation.UPSERTED;
-            return lastUpdateCount;
-		}
-
-		@Override
-		public ResultSetMetaData getResultSetMetaData() throws SQLException {
-			return null;
-		}
-
-		@Override
-		public MutationPlan compilePlan() throws SQLException {
-		    CreateSequenceCompiler compiler = new CreateSequenceCompiler(PhoenixStatement.this);
+		@SuppressWarnings("unchecked")
+        @Override
+		public MutationPlan compilePlan(PhoenixStatement stmt) throws SQLException {
+		    CreateSequenceCompiler compiler = new CreateSequenceCompiler(stmt);
             return compiler.compile(this);
 		}
-
-        @Override
-        public MutationPlan optimizePlan() throws SQLException {
-            return compilePlan();
-        }
 	}
 
-    private class ExecutableDropSequenceStatement extends DropSequenceStatement implements ExecutableStatement {
+    private static class ExecutableDropSequenceStatement extends DropSequenceStatement implements CompilableStatement {
 
 
         public ExecutableDropSequenceStatement(TableName sequenceName, boolean ifExists, int bindCount) {
             super(sequenceName, ifExists, bindCount);
         }
 
+        @SuppressWarnings("unchecked")
         @Override
-        public PhoenixResultSet executeQuery() throws SQLException {
-            throw new ExecuteQueryNotApplicableException("DROP SEQUENCE", this.toString());
-        }
-
-        @Override
-        public boolean execute() throws SQLException {
-            executeUpdate();
-            return false;
-        }
-
-        @Override
-        public int executeUpdate() throws SQLException {
-            MutationPlan plan = optimizePlan();
-            MutationState state = plan.execute();
-            lastQueryPlan = null;
-            lastResultSet = null;
-            lastUpdateCount = (int)Math.min(state.getUpdateCount(), Integer.MAX_VALUE);
-            lastUpdateOperation = UpdateOperation.UPSERTED;
-            return lastUpdateCount;
-        }
-
-        @Override
-        public ResultSetMetaData getResultSetMetaData() throws SQLException {
-            return null;
-        }
-
-        @Override
-        public MutationPlan compilePlan() throws SQLException {
-            DropSequenceCompiler compiler = new DropSequenceCompiler(PhoenixStatement.this);
+        public MutationPlan compilePlan(PhoenixStatement stmt) throws SQLException {
+            DropSequenceCompiler compiler = new DropSequenceCompiler(stmt);
             return compiler.compile(this);
         }
-
-        @Override
-        public MutationPlan optimizePlan() throws SQLException {
-            return compilePlan();
-        }
     }
 
-    private class ExecutableDropTableStatement extends DropTableStatement implements ExecutableStatement {
+    private static class ExecutableDropTableStatement extends DropTableStatement implements CompilableStatement {
 
         ExecutableDropTableStatement(TableName tableName, PTableType tableType, boolean ifExists) {
             super(tableName, tableType, ifExists);
         }
 
+        @SuppressWarnings("unchecked")
         @Override
-        public PhoenixResultSet executeQuery() throws SQLException {
-            throw new ExecuteQueryNotApplicableException("DROP TABLE", this.toString());
-        }
-
-        @Override
-        public boolean execute() throws SQLException {
-            executeUpdate();
-            return false;
-        }
-
-        @Override
-        public int executeUpdate() throws SQLException {
-            MetaDataClient client = new MetaDataClient(connection);
-            MutationState state = client.dropTable(this);
-            lastQueryPlan = null;
-            lastResultSet = null;
-            lastUpdateCount = (int)Math.min(state.getUpdateCount(), Integer.MAX_VALUE);
-            lastUpdateOperation = UpdateOperation.DELETED;
-            return lastUpdateCount;
-        }
-
-        @Override
-        public ResultSetMetaData getResultSetMetaData() throws SQLException {
-            return null;
-        }
-
-        @Override
-        public StatementPlan compilePlan() throws SQLException {
-            return new StatementPlan() {
+        public MutationPlan compilePlan(final PhoenixStatement stmt) throws SQLException {
+            return new MutationPlan() {
 
                 @Override
                 public ParameterMetaData getParameterMetaData() {
@@ -555,51 +495,31 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
                 public ExplainPlan getExplainPlan() throws SQLException {
                     return new ExplainPlan(Collections.singletonList("DROP TABLE"));
                 }
+
+                @Override
+                public PhoenixConnection getConnection() {
+                    return stmt.getConnection();
+                }
+
+                @Override
+                public MutationState execute() throws SQLException {
+                    MetaDataClient client = new MetaDataClient(getConnection());
+                    return client.dropTable(ExecutableDropTableStatement.this);
+                }
             };
         }
-        
-        @Override
-        public StatementPlan optimizePlan() throws SQLException {
-            return compilePlan();
-        }
     }
 
-    private class ExecutableDropIndexStatement extends DropIndexStatement implements ExecutableStatement {
+    private static class ExecutableDropIndexStatement extends DropIndexStatement implements CompilableStatement {
 
         public ExecutableDropIndexStatement(NamedNode indexName, TableName tableName, boolean ifExists) {
             super(indexName, tableName, ifExists);
         }
 
+        @SuppressWarnings("unchecked")
         @Override
-        public PhoenixResultSet executeQuery() throws SQLException {
-            throw new ExecuteQueryNotApplicableException("DROP INDEX", this.toString());
-        }
-
-        @Override
-        public boolean execute() throws SQLException {
-            executeUpdate();
-            return false;
-        }
-
-        @Override
-        public int executeUpdate() throws SQLException {
-            MetaDataClient client = new MetaDataClient(connection);
-            MutationState state = client.dropIndex(this);
-            lastQueryPlan = null;
-            lastResultSet = null;
-            lastUpdateCount = (int)Math.min(state.getUpdateCount(), Integer.MAX_VALUE);
-            lastUpdateOperation = UpdateOperation.DELETED;
-            return lastUpdateCount;
-        }
-
-        @Override
-        public ResultSetMetaData getResultSetMetaData() throws SQLException {
-            return null;
-        }
-
-        @Override
-        public StatementPlan compilePlan() throws SQLException {
-            return new StatementPlan() {
+        public MutationPlan compilePlan(final PhoenixStatement stmt) throws SQLException {
+            return new MutationPlan() {
                 
                 @Override
                 public ParameterMetaData getParameterMetaData() {
@@ -610,51 +530,31 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
                 public ExplainPlan getExplainPlan() throws SQLException {
                     return new ExplainPlan(Collections.singletonList("DROP INDEX"));
                 }
+
+                @Override
+                public PhoenixConnection getConnection() {
+                    return stmt.getConnection();
+                }
+
+                @Override
+                public MutationState execute() throws SQLException {
+                    MetaDataClient client = new MetaDataClient(getConnection());
+                    return client.dropIndex(ExecutableDropIndexStatement.this);
+                }
             };
         }
-        
-        @Override
-        public StatementPlan optimizePlan() throws SQLException {
-            return compilePlan();
-        }
     }
 
-    private class ExecutableAlterIndexStatement extends AlterIndexStatement implements ExecutableStatement {
+    private static class ExecutableAlterIndexStatement extends AlterIndexStatement implements CompilableStatement {
 
         public ExecutableAlterIndexStatement(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state) {
             super(indexTableNode, dataTableName, ifExists, state);
         }
 
+        @SuppressWarnings("unchecked")
         @Override
-        public PhoenixResultSet executeQuery() throws SQLException {
-            throw new ExecuteQueryNotApplicableException("ALTER INDEX", this.toString());
-        }
-
-        @Override
-        public boolean execute() throws SQLException {
-            executeUpdate();
-            return false;
-        }
-
-        @Override
-        public int executeUpdate() throws SQLException {
-            MetaDataClient client = new MetaDataClient(connection);
-            MutationState state = client.alterIndex(this);
-            lastQueryPlan = null;
-            lastResultSet = null;
-            lastUpdateCount = (int)Math.min(state.getUpdateCount(), Integer.MAX_VALUE);
-            lastUpdateOperation = UpdateOperation.UPSERTED;
-            return lastUpdateCount;
-        }
-
-        @Override
-        public ResultSetMetaData getResultSetMetaData() throws SQLException {
-            return null;
-        }
-
-        @Override
-        public StatementPlan compilePlan() throws SQLException {
-            return new StatementPlan() {
+        public MutationPlan compilePlan(final PhoenixStatement stmt) throws SQLException {
+            return new MutationPlan() {
                 
                 @Override
                 public ParameterMetaData getParameterMetaData() {
@@ -665,51 +565,31 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
                 public ExplainPlan getExplainPlan() throws SQLException {
                     return new ExplainPlan(Collections.singletonList("ALTER INDEX"));
                 }
+
+                @Override
+                public PhoenixConnection getConnection() {
+                    return stmt.getConnection();
+                }
+
+                @Override
+                public MutationState execute() throws SQLException {
+                    MetaDataClient client = new MetaDataClient(getConnection());
+                    return client.alterIndex(ExecutableAlterIndexStatement.this);
+                }
             };
         }
-        
-        @Override
-        public StatementPlan optimizePlan() throws SQLException {
-            return compilePlan();
-        }
     }
 
-    private class ExecutableAddColumnStatement extends AddColumnStatement implements ExecutableStatement {
+    private static class ExecutableAddColumnStatement extends AddColumnStatement implements CompilableStatement {
 
         ExecutableAddColumnStatement(NamedTableNode table, PTableType tableType, List<ColumnDef> columnDefs, boolean ifNotExists, Map<String, Object> props) {
             super(table, tableType, columnDefs, ifNotExists, props);
         }
 
+        @SuppressWarnings("unchecked")
         @Override
-        public PhoenixResultSet executeQuery() throws SQLException {
-            throw new ExecuteQueryNotApplicableException("ALTER TABLE", this.toString());
-        }
-
-        @Override
-        public boolean execute() throws SQLException {
-            executeUpdate();
-            return false;
-        }
-
-        @Override
-        public int executeUpdate() throws SQLException {
-            MetaDataClient client = new MetaDataClient(connection);
-            MutationState state = client.addColumn(this);
-            lastQueryPlan = null;
-            lastResultSet = null;
-            lastUpdateCount = (int)Math.min(state.getUpdateCount(), Integer.MAX_VALUE);
-            lastUpdateOperation = UpdateOperation.UPSERTED;
-            return lastUpdateCount;
-        }
-
-        @Override
-        public ResultSetMetaData getResultSetMetaData() throws SQLException {
-            return null;
-        }
-
-        @Override
-        public StatementPlan compilePlan() throws SQLException {
-            return new StatementPlan() {
+        public MutationPlan compilePlan(final PhoenixStatement stmt) throws SQLException {
+            return new MutationPlan() {
 
                 @Override
                 public ParameterMetaData getParameterMetaData() {
@@ -720,51 +600,31 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
                 public ExplainPlan getExplainPlan() throws SQLException {
                     return new ExplainPlan(Collections.singletonList("ALTER " + getTableType() + " ADD COLUMN"));
                 }
+
+                @Override
+                public PhoenixConnection getConnection() {
+                    return stmt.getConnection();
+                }
+
+                @Override
+                public MutationState execute() throws SQLException {
+                    MetaDataClient client = new MetaDataClient(getConnection());
+                    return client.addColumn(ExecutableAddColumnStatement.this);
+                }
             };
         }
-        
-        @Override
-        public StatementPlan optimizePlan() throws SQLException {
-            return compilePlan();
-        }
     }
 
-    private class ExecutableDropColumnStatement extends DropColumnStatement implements ExecutableStatement {
+    private static class ExecutableDropColumnStatement extends DropColumnStatement implements CompilableStatement {
 
         ExecutableDropColumnStatement(NamedTableNode table, PTableType tableType, List<ColumnName> columnRefs, boolean ifExists) {
             super(table, tableType, columnRefs, ifExists);
         }
 
+        @SuppressWarnings("unchecked")
         @Override
-        public PhoenixResultSet executeQuery() throws SQLException {
-            throw new ExecuteQueryNotApplicableException("ALTER TABLE", this.toString());
-        }
-
-        @Override
-        public boolean execute() throws SQLException {
-            executeUpdate();
-            return false;
-        }
-
-        @Override
-        public int executeUpdate() throws SQLException {
-            MetaDataClient client = new MetaDataClient(connection);
-            MutationState state = client.dropColumn(this);
-            lastQueryPlan = null;
-            lastResultSet = null;
-            lastUpdateCount = (int)Math.min(state.getUpdateCount(), Integer.MAX_VALUE);
-            lastUpdateOperation = UpdateOperation.UPSERTED;
-            return lastUpdateCount;
-        }
-
-        @Override
-        public ResultSetMetaData getResultSetMetaData() throws SQLException {
-            return null;
-        }
-
-        @Override
-        public StatementPlan compilePlan() throws SQLException {
-            return new StatementPlan() {
+        public MutationPlan compilePlan(final PhoenixStatement stmt) throws SQLException {
+            return new MutationPlan() {
 
                 @Override
                 public ParameterMetaData getParameterMetaData() {
@@ -775,110 +635,22 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
                 public ExplainPlan getExplainPlan() throws SQLException {
                     return new ExplainPlan(Collections.singletonList("ALTER " + getTableType() + " DROP COLUMN"));
                 }
-            };
-        }
-        
-        @Override
-        public StatementPlan optimizePlan() throws SQLException {
-            return compilePlan();
-        }
-    }
-
-    private static final byte[] EXPLAIN_PLAN_FAMILY = QueryConstants.SINGLE_COLUMN_FAMILY;
-    private static final byte[] EXPLAIN_PLAN_COLUMN = PDataType.VARCHAR.toBytes("Plan");
-    private static final String EXPLAIN_PLAN_ALIAS = "PLAN";
-    private static final String EXPLAIN_PLAN_TABLE_NAME = "PLAN_TABLE";
-    private static final PDatum EXPLAIN_PLAN_DATUM = new PDatum() {
-        @Override
-        public boolean isNullable() {
-            return false;
-        }
-        @Override
-        public PDataType getDataType() {
-            return PDataType.VARCHAR;
-        }
-        @Override
-        public Integer getByteSize() {
-            return null;
-        }
-        @Override
-        public Integer getMaxLength() {
-            return null;
-        }
-        @Override
-        public Integer getScale() {
-            return null;
-        }
-		@Override
-		public SortOrder getSortOrder() {
-			return SortOrder.getDefault();
-		}
-    };
-    private static final RowProjector EXPLAIN_PLAN_ROW_PROJECTOR = new RowProjector(Arrays.<ColumnProjector>asList(
-            new ExpressionProjector(EXPLAIN_PLAN_ALIAS, EXPLAIN_PLAN_TABLE_NAME, 
-                    new RowKeyColumnExpression(EXPLAIN_PLAN_DATUM,
-                            new RowKeyValueAccessor(Collections.<PDatum>singletonList(EXPLAIN_PLAN_DATUM), 0)), false)
-            ), 0, true);
-    private class ExecutableExplainStatement extends ExplainStatement implements ExecutableStatement {
 
-        public ExecutableExplainStatement(BindableStatement statement) {
-            super(statement);
-        }
-
-        @Override
-        public ExecutableStatement getStatement() {
-            return (ExecutableStatement) super.getStatement();
-        }
-        
-        @Override
-        public int getBindCount() {
-            return getStatement().getBindCount();
-        }
-
-        @Override
-        public PhoenixResultSet executeQuery() throws SQLException {
-            StatementPlan plan = getStatement().optimizePlan();
-            List<String> planSteps = plan.getExplainPlan().getPlanSteps();
-            List<Tuple> tuples = Lists.newArrayListWithExpectedSize(planSteps.size());
-            for (String planStep : planSteps) {
-                Tuple tuple = new SingleKeyValueTuple(KeyValueUtil.newKeyValue(PDataType.VARCHAR.toBytes(planStep), EXPLAIN_PLAN_FAMILY, EXPLAIN_PLAN_COLUMN, MetaDataProtocol.MIN_TABLE_TIMESTAMP, ByteUtil.EMPTY_BYTE_ARRAY));
-                tuples.add(tuple);
-            }
-            PhoenixResultSet rs = new PhoenixResultSet(new MaterializedResultIterator(tuples),EXPLAIN_PLAN_ROW_PROJECTOR, new PhoenixStatement(connection));
-            lastResultSet = rs;
-            lastQueryPlan = null;
-            lastUpdateCount = NO_UPDATE;
-            return rs;
-        }
-
-        @Override
-        public boolean execute() throws SQLException {
-            executeQuery();
-            return true;
-        }
-
-        @Override
-        public int executeUpdate() throws SQLException {
-            throw new ExecuteUpdateNotApplicableException("ALTER TABLE", this.toString());
-        }
-
-        @Override
-        public ResultSetMetaData getResultSetMetaData() throws SQLException {
-            return new PhoenixResultSetMetaData(connection, EXPLAIN_PLAN_ROW_PROJECTOR);
-        }
+                @Override
+                public PhoenixConnection getConnection() {
+                    return stmt.getConnection();
+                }
 
-        @Override
-        public StatementPlan compilePlan() throws SQLException {
-            return StatementPlan.EMPTY_PLAN;
-        }
-        
-        @Override
-        public StatementPlan optimizePlan() throws SQLException {
-            return compilePlan();
+                @Override
+                public MutationState execute() throws SQLException {
+                    MetaDataClient client = new MetaDataClient(getConnection());
+                    return client.dropColumn(ExecutableDropColumnStatement.this);
+                }
+            };
         }
     }
 
-    protected class ExecutableNodeFactory extends ParseNodeFactory {
+    protected static class ExecutableNodeFactory extends ParseNodeFactory {
         @Override
         public ExecutableSelectStatement select(List<? extends TableNode> from, HintNode hint, boolean isDistinct, List<AliasedNode> select,
                                                 ParseNode where, List<ParseNode> groupBy, ParseNode having,
@@ -958,13 +730,13 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
         }
         
         @Override
-        public ExecutableStatement nextStatement(ParseNodeFactory nodeFactory) throws SQLException {
-            return (ExecutableStatement) super.nextStatement(nodeFactory);
+        public CompilableStatement nextStatement(ParseNodeFactory nodeFactory) throws SQLException {
+            return (CompilableStatement) super.nextStatement(nodeFactory);
         }
 
         @Override
-        public ExecutableStatement parseStatement() throws SQLException {
-            return (ExecutableStatement) super.parseStatement();
+        public CompilableStatement parseStatement() throws SQLException {
+            return (CompilableStatement) super.parseStatement();
         }
     }
     
@@ -972,18 +744,43 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
         return connection.getFormatter(type);
     }
     
+    protected final List<PhoenixPreparedStatement> batch = Lists.newArrayList();
+    
     @Override
     public void addBatch(String sql) throws SQLException {
-        throw new SQLFeatureNotSupportedException();
+        batch.add(new PhoenixPreparedStatement(connection, sql));
     }
 
     @Override
-    public void cancel() throws SQLException {
-        throw new SQLFeatureNotSupportedException();
+    public void clearBatch() throws SQLException {
+        batch.clear();
     }
 
+    /**
+     * Execute the current batch of statements. If any exception occurs
+     * during execution, a {@link org.apache.phoenix.exception.BatchUpdateException}
+     * is thrown which includes the index of the statement within the
+     * batch when the exception occurred.
+     */
     @Override
-    public void clearBatch() throws SQLException {
+    public int[] executeBatch() throws SQLException {
+        int i = 0;
+        try {
+            int[] returnCodes = new int [batch.size()];
+            for (i = 0; i < returnCodes.length; i++) {
+                PhoenixPreparedStatement statement = batch.get(i);
+                returnCodes[i] = statement.execute(true) ? Statement.SUCCESS_NO_INFO : statement.getUpdateCount();
+            }
+            // If we make it all the way through, clear the batch
+            clearBatch();
+            return returnCodes;
+        } catch (Throwable t) {
+            throw new BatchUpdateExecution(t,i);
+        }
+    }
+
+    @Override
+    public void cancel() throws SQLException {
         throw new SQLFeatureNotSupportedException();
     }
 
@@ -1013,57 +810,79 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
         return Collections.<Object>emptyList();
     }
     
-    protected ExecutableStatement parseStatement(String sql) throws SQLException {
+    protected CompilableStatement parseStatement(String sql) throws SQLException {
         PhoenixStatementParser parser = null;
         try {
             parser = new PhoenixStatementParser(sql, new ExecutableNodeFactory());
         } catch (IOException e) {
             throw ServerUtil.parseServerException(e);
         }
-        ExecutableStatement statement = parser.parseStatement();
+        CompilableStatement statement = parser.parseStatement();
         return statement;
     }
     
-    @Override
-    public boolean execute(String sql) throws SQLException {
-        return parseStatement(sql).execute();
-    }
-
     public QueryPlan optimizeQuery(String sql) throws SQLException {
-        return (QueryPlan)parseStatement(sql).optimizePlan();
+        QueryPlan plan = compileQuery(sql);
+        return connection.getQueryServices().getOptimizer().optimize(plan, this);
     }
 
     public QueryPlan compileQuery(String sql) throws SQLException {
-        return (QueryPlan)parseStatement(sql).compilePlan();
+        CompilableStatement stmt = parseStatement(sql);
+        if (stmt.getOperation().isMutation()) {
+            throw new ExecuteQueryNotApplicableException(sql);
+        }
+        return stmt.compilePlan(this);
     }
 
     @Override
     public ResultSet executeQuery(String sql) throws SQLException {
-        return parseStatement(sql).executeQuery();
+        CompilableStatement stmt = parseStatement(sql);
+        if (stmt.getOperation().isMutation()) {
+            throw new ExecuteQueryNotApplicableException(sql);
+        }
+        return executeQuery(stmt);
     }
 
     @Override
     public int executeUpdate(String sql) throws SQLException {
-        return parseStatement(sql).executeUpdate();
+        CompilableStatement stmt = parseStatement(sql);
+        if (!stmt.getOperation().isMutation) {
+            throw new ExecuteUpdateNotApplicableException(sql);
+        }
+        if (!batch.isEmpty()) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH)
+            .build().buildException();
+        }
+        return executeMutation(stmt);
     }
 
     @Override
-    public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
-        throw new SQLFeatureNotSupportedException();
+    public boolean execute(String sql) throws SQLException {
+        CompilableStatement stmt = parseStatement(sql);
+        if (stmt.getOperation().isMutation()) {
+            if (!batch.isEmpty()) {
+                throw new SQLExceptionInfo.Builder(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH)
+                .build().buildException();
+            }
+            executeMutation(stmt);
+            return false;
+        }
+        executeQuery(stmt);
+        return true;
     }
 
     @Override
-    public boolean execute(String sql, int[] columnIndexes) throws SQLException {
+    public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
         throw new SQLFeatureNotSupportedException();
     }
 
     @Override
-    public boolean execute(String sql, String[] columnNames) throws SQLException {
+    public boolean execute(String sql, int[] columnIndexes) throws SQLException {
         throw new SQLFeatureNotSupportedException();
     }
 
     @Override
-    public int[] executeBatch() throws SQLException {
+    public boolean execute(String sql, String[] columnNames) throws SQLException {
         throw new SQLFeatureNotSupportedException();
     }
 
@@ -1129,13 +948,13 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
 
     // For testing
     public QueryPlan getQueryPlan() {
-        return lastQueryPlan;
+        return getLastQueryPlan();
     }
     
     @Override
     public ResultSet getResultSet() throws SQLException {
-        ResultSet rs = lastResultSet;
-        lastResultSet = null;
+        ResultSet rs = getLastResultSet();
+        setLastResultSet(null);
         return rs;
     }
 
@@ -1155,17 +974,17 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
         return ResultSet.TYPE_FORWARD_ONLY;
     }
 
-    public UpdateOperation getUpdateOperation() {
-        return lastUpdateOperation;
+    public Operation getUpdateOperation() {
+        return getLastUpdateOperation();
     }
     
     @Override
     public int getUpdateCount() throws SQLException {
-        int updateCount = lastUpdateCount;
+        int updateCount = getLastUpdateCount();
         // Only first call can get the update count, otherwise
         // some SQL clients get into an infinite loop when an
         // update occurs.
-        lastUpdateCount = NO_UPDATE;
+        setLastUpdateCount(NO_UPDATE);
         return updateCount;
     }
 
@@ -1255,4 +1074,36 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
     public boolean isCloseOnCompletion() throws SQLException {
         throw new SQLFeatureNotSupportedException();
     }
+
+    private PhoenixResultSet getLastResultSet() {
+        return lastResultSet;
+    }
+
+    private void setLastResultSet(PhoenixResultSet lastResultSet) {
+        this.lastResultSet = lastResultSet;
+    }
+
+    private int getLastUpdateCount() {
+        return lastUpdateCount;
+    }
+
+    private void setLastUpdateCount(int lastUpdateCount) {
+        this.lastUpdateCount = lastUpdateCount;
+    }
+
+    private Operation getLastUpdateOperation() {
+        return lastUpdateOperation;
+    }
+
+    private void setLastUpdateOperation(Operation lastUpdateOperation) {
+        this.lastUpdateOperation = lastUpdateOperation;
+    }
+
+    private QueryPlan getLastQueryPlan() {
+        return lastQueryPlan;
+    }
+
+    private void setLastQueryPlan(QueryPlan lastQueryPlan) {
+        this.lastQueryPlan = lastQueryPlan;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/0b3d9487/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 2aa45d7..fea9b62 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -38,6 +38,13 @@ public class QueryOptimizer {
         this.useIndexes = this.services.getProps().getBoolean(QueryServices.USE_INDEXES_ATTRIB, QueryServicesOptions.DEFAULT_USE_INDEXES);
     }
 
+    public QueryPlan optimize(QueryPlan dataPlan, PhoenixStatement statement) throws SQLException {
+        if (dataPlan.getTableRef() == null) {
+            return dataPlan;
+        }
+        return optimize(dataPlan, statement, Collections.<PColumn>emptyList(), null);
+    }
+
     public QueryPlan optimize(SelectStatement select, PhoenixStatement statement) throws SQLException {
         return optimize(select, statement, Collections.<PColumn>emptyList(), null);
     }
@@ -45,13 +52,17 @@ public class QueryOptimizer {
     public QueryPlan optimize(SelectStatement select, PhoenixStatement statement, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory) throws SQLException {
         QueryCompiler compiler = new QueryCompiler(statement, targetColumns, parallelIteratorFactory);
         QueryPlan dataPlan = compiler.compile(select);
+        return optimize(dataPlan, statement, targetColumns, parallelIteratorFactory);
+    }
+    
+    public QueryPlan optimize(QueryPlan dataPlan, PhoenixStatement statement, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory) throws SQLException {
+        // Get the statement as it's been normalized now
+        // TODO: the recompile for the index tables could skip the normalize step
+        SelectStatement select = (SelectStatement)dataPlan.getStatement();
         // TODO: consider not even compiling index plans if we have a point lookup
         if (!useIndexes || select.getFrom().size() > 1) {
             return dataPlan;
         }
-        // Get the statement as it's been normalized now
-        // TODO: the recompile for the index tables could skip the normalize step
-        select = (SelectStatement)dataPlan.getStatement();
         PTable dataTable = dataPlan.getTableRef().getTable();
         List<PTable>indexes = Lists.newArrayList(dataTable.getIndexes());
         if (indexes.isEmpty() || dataPlan.getTableRef().hasDynamicCols() || select.getHint().hasHint(Hint.NO_INDEX)) {

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/0b3d9487/phoenix-core/src/main/java/org/apache/phoenix/parse/BindableStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/BindableStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/BindableStatement.java
index 16d0b65..83f6821 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/BindableStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/BindableStatement.java
@@ -19,7 +19,10 @@
  */
 package org.apache.phoenix.parse;
 
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+
 
 public interface BindableStatement {
     public int getBindCount();
+    public Operation getOperation();
 }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/0b3d9487/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateSequenceStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateSequenceStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateSequenceStatement.java
index ae6370a..2f8dd64 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateSequenceStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateSequenceStatement.java
@@ -19,7 +19,7 @@
  */
 package org.apache.phoenix.parse;
 
-public class CreateSequenceStatement implements BindableStatement {
+public class CreateSequenceStatement extends MutableStatement {
 
 	private final TableName sequenceName;
 	private final ParseNode startWith;

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/0b3d9487/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateTableStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateTableStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateTableStatement.java
index 74f1c11..71c0c8c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateTableStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateTableStatement.java
@@ -23,14 +23,14 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.schema.PTableType;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableListMultimap;
 import com.google.common.collect.ListMultimap;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.schema.PTableType;
 
-public class CreateTableStatement implements BindableStatement {
+public class CreateTableStatement extends MutableStatement {
     private final TableName tableName;
     private final PTableType tableType;
     private final List<ColumnDef> columns;

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/0b3d9487/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java
index 377fa12..a1ffcaa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/DeleteStatement.java
@@ -22,6 +22,8 @@ package org.apache.phoenix.parse;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+
 public class DeleteStatement extends SingleTableSQLStatement implements FilterableStatement {
     private final ParseNode whereNode;
     private final List<OrderByNode> orderBy;
@@ -66,4 +68,8 @@ public class DeleteStatement extends SingleTableSQLStatement implements Filterab
         return false;
     }
 
+    @Override
+    public Operation getOperation() {
+        return Operation.DELETE;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/0b3d9487/phoenix-core/src/main/java/org/apache/phoenix/parse/DropIndexStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/DropIndexStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/DropIndexStatement.java
index 5dc0df9..0048ad0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/DropIndexStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/DropIndexStatement.java
@@ -19,7 +19,9 @@
  */
 package org.apache.phoenix.parse;
 
-public class DropIndexStatement implements BindableStatement {
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+
+public class DropIndexStatement extends MutableStatement {
     private final TableName tableName;
     private final NamedNode indexName;
     private final boolean ifExists;
@@ -47,4 +49,8 @@ public class DropIndexStatement implements BindableStatement {
         return ifExists;
     }
 
+    @Override
+    public Operation getOperation() {
+        return Operation.DELETE;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/0b3d9487/phoenix-core/src/main/java/org/apache/phoenix/parse/DropSequenceStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/DropSequenceStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/DropSequenceStatement.java
index 060f115..7dae1b2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/DropSequenceStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/DropSequenceStatement.java
@@ -19,7 +19,9 @@
  */
 package org.apache.phoenix.parse;
 
-public class DropSequenceStatement implements BindableStatement {
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+
+public class DropSequenceStatement extends MutableStatement {
 
     private final TableName sequenceName;
     private final boolean ifExists;
@@ -43,4 +45,9 @@ public class DropSequenceStatement implements BindableStatement {
     public boolean ifExists() {
         return ifExists;
     }
+    
+    @Override
+    public Operation getOperation() {
+        return Operation.DELETE;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/0b3d9487/phoenix-core/src/main/java/org/apache/phoenix/parse/DropTableStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/DropTableStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/DropTableStatement.java
index 2d35257..47eedd2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/DropTableStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/DropTableStatement.java
@@ -19,9 +19,10 @@
  */
 package org.apache.phoenix.parse;
 
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
 import org.apache.phoenix.schema.PTableType;
 
-public class DropTableStatement implements BindableStatement {
+public class DropTableStatement extends MutableStatement {
     private final TableName tableName;
     private final boolean ifExists;
     private final PTableType tableType;
@@ -48,4 +49,9 @@ public class DropTableStatement implements BindableStatement {
     public boolean ifExists() {
         return ifExists;
     }
+    
+    @Override
+    public Operation getOperation() {
+        return Operation.DELETE;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/0b3d9487/phoenix-core/src/main/java/org/apache/phoenix/parse/ExplainStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ExplainStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ExplainStatement.java
index 0ce09d5..01b9f29 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ExplainStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ExplainStatement.java
@@ -19,6 +19,8 @@
  */
 package org.apache.phoenix.parse;
 
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+
 public class ExplainStatement implements BindableStatement {
     private final BindableStatement statement;
     
@@ -34,4 +36,9 @@ public class ExplainStatement implements BindableStatement {
     public int getBindCount() {
         return statement.getBindCount();
     }
+
+    @Override
+    public Operation getOperation() {
+        return Operation.QUERY;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/0b3d9487/phoenix-core/src/main/java/org/apache/phoenix/parse/MutableStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/MutableStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/MutableStatement.java
new file mode 100644
index 0000000..833c5c6
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/MutableStatement.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
+
+public abstract class MutableStatement implements BindableStatement {
+
+    @Override
+    public Operation getOperation() {
+        return Operation.UPSERT;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/0b3d9487/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
index 07265ff..12c560b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.phoenix.expression.function.CountAggregateFunction;
+import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
 import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
 import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunctionInfo;
 
@@ -176,4 +177,9 @@ public class SelectStatement implements FilterableStatement {
     public boolean isAggregate() {
         return isAggregate;
     }
+
+    @Override
+    public Operation getOperation() {
+        return Operation.QUERY;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/0b3d9487/phoenix-core/src/main/java/org/apache/phoenix/parse/SingleTableSQLStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/SingleTableSQLStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/SingleTableSQLStatement.java
index c2aa6e1..0f1d7ad 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/SingleTableSQLStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/SingleTableSQLStatement.java
@@ -19,7 +19,7 @@
  */
 package org.apache.phoenix.parse;
 
-public abstract class SingleTableSQLStatement implements BindableStatement {
+public abstract class SingleTableSQLStatement extends MutableStatement {
     private final NamedTableNode table;
     private final int bindCount;
 

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/0b3d9487/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index ac2c458..a8857f0 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -33,6 +33,7 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
@@ -1310,4 +1311,43 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
         assertFalse(scan.getCacheBlocks());
     }
 
+    @Test
+    public void testExecuteWithNonEmptyBatch() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        try {
+            Statement stmt = conn.createStatement();
+            stmt.addBatch("SELECT * FROM atable");
+            stmt.execute("UPSERT INTO atable VALUES('000000000000000','000000000000000')");
+            fail();
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH.getErrorCode(), e.getErrorCode());
+        }
+        try {
+            Statement stmt = conn.createStatement();
+            stmt.addBatch("SELECT * FROM atable");
+            stmt.executeUpdate("UPSERT INTO atable VALUES('000000000000000','000000000000000')");
+            fail();
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH.getErrorCode(), e.getErrorCode());
+        }
+        try {
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO atable VALUES('000000000000000','000000000000000')");
+            stmt.addBatch();
+            stmt.execute();
+            fail();
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH.getErrorCode(), e.getErrorCode());
+        }
+        conn.close();
+        try {
+            PreparedStatement stmt = conn.prepareStatement("SELECT * FROM atable");
+            stmt.addBatch();
+            stmt.executeUpdate();
+            fail();
+        } catch (SQLException e) {
+            assertEquals(SQLExceptionCode.EXECUTE_UPDATE_WITH_NON_EMPTY_BATCH.getErrorCode(), e.getErrorCode());
+        }
+        conn.close();
+    }
+
 }