You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 23:15:37 UTC

[21/51] [partial] Initial commit of master branch from github

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
new file mode 100644
index 0000000..8592f49
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -0,0 +1,1258 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.sql.ParameterMetaData;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLWarning;
+import java.sql.Statement;
+import java.text.Format;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import org.apache.phoenix.compile.ColumnProjector;
+import org.apache.phoenix.compile.CreateIndexCompiler;
+import org.apache.phoenix.compile.CreateSequenceCompiler;
+import org.apache.phoenix.compile.CreateTableCompiler;
+import org.apache.phoenix.compile.DeleteCompiler;
+import org.apache.phoenix.compile.DropSequenceCompiler;
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.ExpressionProjector;
+import org.apache.phoenix.compile.MutationPlan;
+import org.apache.phoenix.compile.QueryCompiler;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementPlan;
+import org.apache.phoenix.compile.UpsertCompiler;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.expression.RowKeyColumnExpression;
+import org.apache.phoenix.iterate.MaterializedResultIterator;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.parse.AddColumnStatement;
+import org.apache.phoenix.parse.AliasedNode;
+import org.apache.phoenix.parse.AlterIndexStatement;
+import org.apache.phoenix.parse.BindableStatement;
+import org.apache.phoenix.parse.ColumnDef;
+import org.apache.phoenix.parse.ColumnName;
+import org.apache.phoenix.parse.CreateIndexStatement;
+import org.apache.phoenix.parse.CreateSequenceStatement;
+import org.apache.phoenix.parse.CreateTableStatement;
+import org.apache.phoenix.parse.DeleteStatement;
+import org.apache.phoenix.parse.DropColumnStatement;
+import org.apache.phoenix.parse.DropIndexStatement;
+import org.apache.phoenix.parse.DropSequenceStatement;
+import org.apache.phoenix.parse.DropTableStatement;
+import org.apache.phoenix.parse.ExplainStatement;
+import org.apache.phoenix.parse.HintNode;
+import org.apache.phoenix.parse.LimitNode;
+import org.apache.phoenix.parse.NamedNode;
+import org.apache.phoenix.parse.NamedTableNode;
+import org.apache.phoenix.parse.OrderByNode;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.parse.ParseNodeFactory;
+import org.apache.phoenix.parse.PrimaryKeyConstraint;
+import org.apache.phoenix.parse.SQLParser;
+import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.parse.TableName;
+import org.apache.phoenix.parse.TableNode;
+import org.apache.phoenix.parse.UpsertStatement;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.ExecuteQueryNotApplicableException;
+import org.apache.phoenix.schema.ExecuteUpdateNotApplicableException;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PDatum;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.RowKeyValueAccessor;
+import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.SQLCloseable;
+import org.apache.phoenix.util.SQLCloseables;
+import org.apache.phoenix.util.ServerUtil;
+
+
+/**
+ * 
+ * JDBC Statement implementation of Phoenix.
+ * Currently only the following methods are supported:
+ * - {@link #executeQuery(String)}
+ * - {@link #executeUpdate(String)}
+ * - {@link #execute(String)}
+ * - {@link #getResultSet()}
+ * - {@link #getUpdateCount()}
+ * - {@link #close()}
+ * The Statement only supports the following options:
+ * - ResultSet.FETCH_FORWARD
+ * - ResultSet.TYPE_FORWARD_ONLY
+ * - ResultSet.CLOSE_CURSORS_AT_COMMIT
+ * 
+ * @author jtaylor
+ * @since 0.1
+ */
+public class PhoenixStatement implements Statement, SQLCloseable, org.apache.phoenix.jdbc.Jdbc7Shim.Statement {
+    public enum UpdateOperation {
+        DELETED("deleted"),
+        UPSERTED("upserted");
+        
+        private final String toString;
+        UpdateOperation(String toString) {
+            this.toString = toString;
+        }
+        
+        @Override
+        public String toString() {
+            return toString;
+        }
+        };
+
+    protected final PhoenixConnection connection;
+    private static final int NO_UPDATE = -1;
+    private List<PhoenixResultSet> resultSets = new ArrayList<PhoenixResultSet>();
+    private QueryPlan lastQueryPlan;
+    private PhoenixResultSet lastResultSet;
+    private int lastUpdateCount = NO_UPDATE;
+    private UpdateOperation lastUpdateOperation;
+    private boolean isClosed = false;
+    private ResultSetMetaData resultSetMetaData;
+    private int maxRows;
+    
+    
+    public PhoenixStatement(PhoenixConnection connection) {
+        this.connection = connection;
+    }
+    
+    protected List<PhoenixResultSet> getResultSets() {
+        return resultSets;
+    }
+    
+    protected PhoenixResultSet newResultSet(ResultIterator iterator, RowProjector projector) throws SQLException {
+        return new PhoenixResultSet(iterator, projector, PhoenixStatement.this);
+    }
+    
+    protected static interface ExecutableStatement extends BindableStatement {
+        public boolean execute() throws SQLException;
+        public int executeUpdate() throws SQLException;
+        public PhoenixResultSet executeQuery() throws SQLException;
+        public ResultSetMetaData getResultSetMetaData() throws SQLException;
+        public StatementPlan optimizePlan() throws SQLException;
+        public StatementPlan compilePlan() throws SQLException;
+    }
+    
+    protected static interface MutatableStatement extends ExecutableStatement {
+        @Override
+        public MutationPlan optimizePlan() throws SQLException;
+    }
+    
+    private class ExecutableSelectStatement extends SelectStatement implements ExecutableStatement {
+        private ExecutableSelectStatement(List<? extends TableNode> from, HintNode hint, boolean isDistinct, List<AliasedNode> select, ParseNode where,
+                List<ParseNode> groupBy, ParseNode having, List<OrderByNode> orderBy, LimitNode limit, int bindCount, boolean isAggregate) {
+            super(from, hint, isDistinct, select, where, groupBy, having, orderBy, limit, bindCount, isAggregate);
+        }
+
+        @Override
+        public PhoenixResultSet executeQuery() throws SQLException {
+            QueryPlan plan = optimizePlan();
+            PhoenixResultSet rs = newResultSet(plan.iterator(), plan.getProjector());
+            resultSets.add(rs);
+            lastResultSet = rs;
+            lastUpdateCount = NO_UPDATE;
+            lastUpdateOperation = null;
+            return rs;
+        }
+
+        @Override
+        public boolean execute() throws SQLException {
+            executeQuery();
+            return true;
+        }
+
+        @Override
+        public int executeUpdate() throws SQLException {
+            throw new ExecuteUpdateNotApplicableException(this.toString());
+        }
+
+        @Override
+        public QueryPlan optimizePlan() throws SQLException {
+            return lastQueryPlan = connection.getQueryServices().getOptimizer().optimize(this, PhoenixStatement.this);
+        }
+        
+        @Override
+        public StatementPlan compilePlan() throws SQLException {
+            return new QueryCompiler(PhoenixStatement.this).compile(this);
+        }
+        
+        @Override
+        public ResultSetMetaData getResultSetMetaData() throws SQLException {
+            if (resultSetMetaData == null) {
+                // Just compile top level query without optimizing to get ResultSetMetaData
+                QueryPlan plan = new QueryCompiler(PhoenixStatement.this).compile(this);
+                resultSetMetaData = new PhoenixResultSetMetaData(connection, plan.getProjector());
+            }
+            return resultSetMetaData;
+        }
+    }
+    
+    private int executeMutation(MutationPlan plan) throws SQLException {
+        // Note that the upsert select statements will need to commit any open transaction here,
+        // since they'd update data directly from coprocessors, and should thus operate on
+        // the latest state
+        MutationState state = plan.execute();
+        connection.getMutationState().join(state);
+        if (connection.getAutoCommit()) {
+            connection.commit();
+        }
+        lastResultSet = null;
+        lastQueryPlan = null;
+        // Unfortunately, JDBC uses an int for update count, so we
+        // just max out at Integer.MAX_VALUE
+        long updateCount = state.getUpdateCount();
+        lastUpdateCount = (int)Math.min(Integer.MAX_VALUE, updateCount);
+        return lastUpdateCount;
+    }
+    
+    private class ExecutableUpsertStatement extends UpsertStatement implements MutatableStatement {
+        private ExecutableUpsertStatement(NamedTableNode table, HintNode hintNode, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount) {
+            super(table, hintNode, columns, values, select, bindCount);
+        }
+
+        @Override
+        public PhoenixResultSet executeQuery() throws SQLException {
+            throw new ExecuteQueryNotApplicableException("upsert", this.toString());
+        }
+
+        @Override
+        public boolean execute() throws SQLException {
+            executeUpdate();
+            return false;
+        }
+
+        @Override
+        public int executeUpdate() throws SQLException {
+            lastUpdateOperation = UpdateOperation.UPSERTED;
+            return executeMutation(optimizePlan());
+        }
+
+        @Override
+        public ResultSetMetaData getResultSetMetaData() throws SQLException {
+            return null;
+        }
+
+        @Override
+        public MutationPlan compilePlan() throws SQLException {
+            UpsertCompiler compiler = new UpsertCompiler(PhoenixStatement.this);
+            return compiler.compile(this);
+        }
+        
+        @Override
+        public MutationPlan optimizePlan() throws SQLException {
+            return compilePlan();
+        }
+    }
+    
+    private class ExecutableDeleteStatement extends DeleteStatement implements MutatableStatement {
+        private ExecutableDeleteStatement(NamedTableNode table, HintNode hint, ParseNode whereNode, List<OrderByNode> orderBy, LimitNode limit, int bindCount) {
+            super(table, hint, whereNode, orderBy, limit, bindCount);
+        }
+
+        @Override
+        public PhoenixResultSet executeQuery() throws SQLException {
+            throw new ExecuteQueryNotApplicableException("delete", this.toString());
+        }
+
+        @Override
+        public boolean execute() throws SQLException {
+            executeUpdate();
+            return false;
+        }
+
+        @Override
+        public int executeUpdate() throws SQLException {
+            lastUpdateOperation = UpdateOperation.DELETED;
+            return executeMutation(optimizePlan());
+        }
+
+        @Override
+        public ResultSetMetaData getResultSetMetaData() throws SQLException {
+            return null;
+        }
+
+        @Override
+        public MutationPlan compilePlan() throws SQLException {
+            DeleteCompiler compiler = new DeleteCompiler(PhoenixStatement.this);
+            return compiler.compile(this);
+        }
+        
+        @Override
+        public MutationPlan optimizePlan() throws SQLException {
+            return compilePlan();
+        }
+    }
+    
+    private class ExecutableCreateTableStatement extends CreateTableStatement implements ExecutableStatement {
+        ExecutableCreateTableStatement(TableName tableName, ListMultimap<String,Pair<String,Object>> props, List<ColumnDef> columnDefs,
+                PrimaryKeyConstraint pkConstraint, List<ParseNode> splitNodes, PTableType tableType, boolean ifNotExists,
+                TableName baseTableName, ParseNode tableTypeIdNode, int bindCount) {
+            super(tableName, props, columnDefs, pkConstraint, splitNodes, tableType, ifNotExists, baseTableName, tableTypeIdNode, bindCount);
+        }
+
+        @Override
+        public PhoenixResultSet executeQuery() throws SQLException {
+            throw new ExecuteQueryNotApplicableException("CREATE TABLE", this.toString());
+        }
+
+        @Override
+        public boolean execute() throws SQLException {
+            executeUpdate();
+            return false;
+        }
+
+        @Override
+        public int executeUpdate() throws SQLException {
+            MutationPlan plan = optimizePlan();
+            MutationState state = plan.execute();
+            lastQueryPlan = null;
+            lastResultSet = null;
+            lastUpdateCount = (int)Math.min(state.getUpdateCount(), Integer.MAX_VALUE);
+            lastUpdateOperation = UpdateOperation.UPSERTED;
+            return lastUpdateCount;
+        }
+
+        @Override
+        public ResultSetMetaData getResultSetMetaData() throws SQLException {
+            return null;
+        }
+
+        @Override
+        public MutationPlan compilePlan() throws SQLException {
+            CreateTableCompiler compiler = new CreateTableCompiler(PhoenixStatement.this);
+            return compiler.compile(this);
+        }
+        
+        @Override
+        public MutationPlan optimizePlan() throws SQLException {
+            return compilePlan();
+        }
+    }
+
+    private class ExecutableCreateIndexStatement extends CreateIndexStatement implements ExecutableStatement {
+
+        public ExecutableCreateIndexStatement(NamedNode indexName, NamedTableNode dataTable, PrimaryKeyConstraint pkConstraint, List<ColumnName> includeColumns, List<ParseNode> splits,
+                ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, int bindCount) {
+            super(indexName, dataTable, pkConstraint, includeColumns, splits, props, ifNotExists, bindCount);
+        }
+
+        @Override
+        public PhoenixResultSet executeQuery() throws SQLException {
+            throw new ExecuteQueryNotApplicableException("CREATE INDEX", this.toString());
+        }
+
+        @Override
+        public boolean execute() throws SQLException {
+            executeUpdate();
+            return false;
+        }
+
+        @Override
+        public int executeUpdate() throws SQLException {
+            MutationPlan plan = optimizePlan();
+            MutationState state = plan.execute();
+            lastQueryPlan = null;
+            lastResultSet = null;
+            lastUpdateCount = (int)Math.min(state.getUpdateCount(), Integer.MAX_VALUE);
+            lastUpdateOperation = UpdateOperation.UPSERTED;
+            return lastUpdateCount;
+        }
+
+        @Override
+        public ResultSetMetaData getResultSetMetaData() throws SQLException {
+            return null;
+        }
+
+        @Override
+        public MutationPlan compilePlan() throws SQLException {
+            CreateIndexCompiler compiler = new CreateIndexCompiler(PhoenixStatement.this);
+            return compiler.compile(this);
+        }
+        
+        @Override
+        public MutationPlan optimizePlan() throws SQLException {
+            return compilePlan();
+        }
+    }
+    
+    private class ExecutableCreateSequenceStatement extends	CreateSequenceStatement implements ExecutableStatement {
+
+		public ExecutableCreateSequenceStatement(TableName sequenceName, ParseNode startWith, ParseNode incrementBy, ParseNode cacheSize, boolean ifNotExists, int bindCount) {
+			super(sequenceName, startWith, incrementBy, cacheSize, ifNotExists, bindCount);
+		}
+
+		@Override
+		public PhoenixResultSet executeQuery() throws SQLException {
+			throw new ExecuteQueryNotApplicableException("CREATE SEQUENCE",	this.toString());
+		}
+
+		@Override
+		public boolean execute() throws SQLException {
+		    executeUpdate();
+		    return false;
+		}
+
+		@Override
+		public int executeUpdate() throws SQLException {
+            MutationPlan plan = optimizePlan();
+            MutationState state = plan.execute();
+            lastQueryPlan = null;
+            lastResultSet = null;
+            lastUpdateCount = (int)Math.min(state.getUpdateCount(), Integer.MAX_VALUE);
+            lastUpdateOperation = UpdateOperation.UPSERTED;
+            return lastUpdateCount;
+		}
+
+		@Override
+		public ResultSetMetaData getResultSetMetaData() throws SQLException {
+			return null;
+		}
+
+		@Override
+		public MutationPlan compilePlan() throws SQLException {
+		    CreateSequenceCompiler compiler = new CreateSequenceCompiler(PhoenixStatement.this);
+            return compiler.compile(this);
+		}
+
+        @Override
+        public MutationPlan optimizePlan() throws SQLException {
+            return compilePlan();
+        }
+	}
+
+    private class ExecutableDropSequenceStatement extends DropSequenceStatement implements ExecutableStatement {
+
+
+        public ExecutableDropSequenceStatement(TableName sequenceName, boolean ifExists, int bindCount) {
+            super(sequenceName, ifExists, bindCount);
+        }
+
+        @Override
+        public PhoenixResultSet executeQuery() throws SQLException {
+            throw new ExecuteQueryNotApplicableException("DROP SEQUENCE", this.toString());
+        }
+
+        @Override
+        public boolean execute() throws SQLException {
+            executeUpdate();
+            return false;
+        }
+
+        @Override
+        public int executeUpdate() throws SQLException {
+            MutationPlan plan = optimizePlan();
+            MutationState state = plan.execute();
+            lastQueryPlan = null;
+            lastResultSet = null;
+            lastUpdateCount = (int)Math.min(state.getUpdateCount(), Integer.MAX_VALUE);
+            lastUpdateOperation = UpdateOperation.UPSERTED;
+            return lastUpdateCount;
+        }
+
+        @Override
+        public ResultSetMetaData getResultSetMetaData() throws SQLException {
+            return null;
+        }
+
+        @Override
+        public MutationPlan compilePlan() throws SQLException {
+            DropSequenceCompiler compiler = new DropSequenceCompiler(PhoenixStatement.this);
+            return compiler.compile(this);
+        }
+
+        @Override
+        public MutationPlan optimizePlan() throws SQLException {
+            return compilePlan();
+        }
+    }
+
+    private class ExecutableDropTableStatement extends DropTableStatement implements ExecutableStatement {
+
+        ExecutableDropTableStatement(TableName tableName, PTableType tableType, boolean ifExists) {
+            super(tableName, tableType, ifExists);
+        }
+
+        @Override
+        public PhoenixResultSet executeQuery() throws SQLException {
+            throw new ExecuteQueryNotApplicableException("DROP TABLE", this.toString());
+        }
+
+        @Override
+        public boolean execute() throws SQLException {
+            executeUpdate();
+            return false;
+        }
+
+        @Override
+        public int executeUpdate() throws SQLException {
+            MetaDataClient client = new MetaDataClient(connection);
+            MutationState state = client.dropTable(this);
+            lastQueryPlan = null;
+            lastResultSet = null;
+            lastUpdateCount = (int)Math.min(state.getUpdateCount(), Integer.MAX_VALUE);
+            lastUpdateOperation = UpdateOperation.DELETED;
+            return lastUpdateCount;
+        }
+
+        @Override
+        public ResultSetMetaData getResultSetMetaData() throws SQLException {
+            return null;
+        }
+
+        @Override
+        public StatementPlan compilePlan() throws SQLException {
+            return new StatementPlan() {
+
+                @Override
+                public ParameterMetaData getParameterMetaData() {
+                    return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA;
+                }
+
+                @Override
+                public ExplainPlan getExplainPlan() throws SQLException {
+                    return new ExplainPlan(Collections.singletonList("DROP TABLE"));
+                }
+            };
+        }
+        
+        @Override
+        public StatementPlan optimizePlan() throws SQLException {
+            return compilePlan();
+        }
+    }
+
+    private class ExecutableDropIndexStatement extends DropIndexStatement implements ExecutableStatement {
+
+        public ExecutableDropIndexStatement(NamedNode indexName, TableName tableName, boolean ifExists) {
+            super(indexName, tableName, ifExists);
+        }
+
+        @Override
+        public PhoenixResultSet executeQuery() throws SQLException {
+            throw new ExecuteQueryNotApplicableException("DROP INDEX", this.toString());
+        }
+
+        @Override
+        public boolean execute() throws SQLException {
+            executeUpdate();
+            return false;
+        }
+
+        @Override
+        public int executeUpdate() throws SQLException {
+            MetaDataClient client = new MetaDataClient(connection);
+            MutationState state = client.dropIndex(this);
+            lastQueryPlan = null;
+            lastResultSet = null;
+            lastUpdateCount = (int)Math.min(state.getUpdateCount(), Integer.MAX_VALUE);
+            lastUpdateOperation = UpdateOperation.DELETED;
+            return lastUpdateCount;
+        }
+
+        @Override
+        public ResultSetMetaData getResultSetMetaData() throws SQLException {
+            return null;
+        }
+
+        @Override
+        public StatementPlan compilePlan() throws SQLException {
+            return new StatementPlan() {
+                
+                @Override
+                public ParameterMetaData getParameterMetaData() {
+                    return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA;
+                }
+                
+                @Override
+                public ExplainPlan getExplainPlan() throws SQLException {
+                    return new ExplainPlan(Collections.singletonList("DROP INDEX"));
+                }
+            };
+        }
+        
+        @Override
+        public StatementPlan optimizePlan() throws SQLException {
+            return compilePlan();
+        }
+    }
+
+    private class ExecutableAlterIndexStatement extends AlterIndexStatement implements ExecutableStatement {
+
+        public ExecutableAlterIndexStatement(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state) {
+            super(indexTableNode, dataTableName, ifExists, state);
+        }
+
+        @Override
+        public PhoenixResultSet executeQuery() throws SQLException {
+            throw new ExecuteQueryNotApplicableException("ALTER INDEX", this.toString());
+        }
+
+        @Override
+        public boolean execute() throws SQLException {
+            executeUpdate();
+            return false;
+        }
+
+        @Override
+        public int executeUpdate() throws SQLException {
+            MetaDataClient client = new MetaDataClient(connection);
+            MutationState state = client.alterIndex(this);
+            lastQueryPlan = null;
+            lastResultSet = null;
+            lastUpdateCount = (int)Math.min(state.getUpdateCount(), Integer.MAX_VALUE);
+            lastUpdateOperation = UpdateOperation.UPSERTED;
+            return lastUpdateCount;
+        }
+
+        @Override
+        public ResultSetMetaData getResultSetMetaData() throws SQLException {
+            return null;
+        }
+
+        @Override
+        public StatementPlan compilePlan() throws SQLException {
+            return new StatementPlan() {
+                
+                @Override
+                public ParameterMetaData getParameterMetaData() {
+                    return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA;
+                }
+                
+                @Override
+                public ExplainPlan getExplainPlan() throws SQLException {
+                    return new ExplainPlan(Collections.singletonList("ALTER INDEX"));
+                }
+            };
+        }
+        
+        @Override
+        public StatementPlan optimizePlan() throws SQLException {
+            return compilePlan();
+        }
+    }
+
+    private class ExecutableAddColumnStatement extends AddColumnStatement implements ExecutableStatement {
+
+        ExecutableAddColumnStatement(NamedTableNode table, PTableType tableType, List<ColumnDef> columnDefs, boolean ifNotExists, Map<String, Object> props) {
+            super(table, tableType, columnDefs, ifNotExists, props);
+        }
+
+        @Override
+        public PhoenixResultSet executeQuery() throws SQLException {
+            throw new ExecuteQueryNotApplicableException("ALTER TABLE", this.toString());
+        }
+
+        @Override
+        public boolean execute() throws SQLException {
+            executeUpdate();
+            return false;
+        }
+
+        @Override
+        public int executeUpdate() throws SQLException {
+            MetaDataClient client = new MetaDataClient(connection);
+            MutationState state = client.addColumn(this);
+            lastQueryPlan = null;
+            lastResultSet = null;
+            lastUpdateCount = (int)Math.min(state.getUpdateCount(), Integer.MAX_VALUE);
+            lastUpdateOperation = UpdateOperation.UPSERTED;
+            return lastUpdateCount;
+        }
+
+        @Override
+        public ResultSetMetaData getResultSetMetaData() throws SQLException {
+            return null;
+        }
+
+        @Override
+        public StatementPlan compilePlan() throws SQLException {
+            return new StatementPlan() {
+
+                @Override
+                public ParameterMetaData getParameterMetaData() {
+                    return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA;
+                }
+
+                @Override
+                public ExplainPlan getExplainPlan() throws SQLException {
+                    return new ExplainPlan(Collections.singletonList("ALTER " + getTableType() + " ADD COLUMN"));
+                }
+            };
+        }
+        
+        @Override
+        public StatementPlan optimizePlan() throws SQLException {
+            return compilePlan();
+        }
+    }
+
+    private class ExecutableDropColumnStatement extends DropColumnStatement implements ExecutableStatement {
+
+        ExecutableDropColumnStatement(NamedTableNode table, PTableType tableType, List<ColumnName> columnRefs, boolean ifExists) {
+            super(table, tableType, columnRefs, ifExists);
+        }
+
+        @Override
+        public PhoenixResultSet executeQuery() throws SQLException {
+            throw new ExecuteQueryNotApplicableException("ALTER TABLE", this.toString());
+        }
+
+        @Override
+        public boolean execute() throws SQLException {
+            executeUpdate();
+            return false;
+        }
+
+        @Override
+        public int executeUpdate() throws SQLException {
+            MetaDataClient client = new MetaDataClient(connection);
+            MutationState state = client.dropColumn(this);
+            lastQueryPlan = null;
+            lastResultSet = null;
+            lastUpdateCount = (int)Math.min(state.getUpdateCount(), Integer.MAX_VALUE);
+            lastUpdateOperation = UpdateOperation.UPSERTED;
+            return lastUpdateCount;
+        }
+
+        @Override
+        public ResultSetMetaData getResultSetMetaData() throws SQLException {
+            return null;
+        }
+
+        @Override
+        public StatementPlan compilePlan() throws SQLException {
+            return new StatementPlan() {
+
+                @Override
+                public ParameterMetaData getParameterMetaData() {
+                    return new PhoenixParameterMetaData(0);
+                }
+
+                @Override
+                public ExplainPlan getExplainPlan() throws SQLException {
+                    return new ExplainPlan(Collections.singletonList("ALTER " + getTableType() + " DROP COLUMN"));
+                }
+            };
+        }
+        
+        @Override
+        public StatementPlan optimizePlan() throws SQLException {
+            return compilePlan();
+        }
+    }
+
+    private static final byte[] EXPLAIN_PLAN_FAMILY = QueryConstants.SINGLE_COLUMN_FAMILY;
+    private static final byte[] EXPLAIN_PLAN_COLUMN = PDataType.VARCHAR.toBytes("Plan");
+    private static final String EXPLAIN_PLAN_ALIAS = "PLAN";
+    private static final String EXPLAIN_PLAN_TABLE_NAME = "PLAN_TABLE";
+    private static final PDatum EXPLAIN_PLAN_DATUM = new PDatum() {
+        @Override
+        public boolean isNullable() {
+            return false;
+        }
+        @Override
+        public PDataType getDataType() {
+            return PDataType.VARCHAR;
+        }
+        @Override
+        public Integer getByteSize() {
+            return null;
+        }
+        @Override
+        public Integer getMaxLength() {
+            return null;
+        }
+        @Override
+        public Integer getScale() {
+            return null;
+        }
+		@Override
+		public ColumnModifier getColumnModifier() {
+			return null;
+		}
+    };
+    private static final RowProjector EXPLAIN_PLAN_ROW_PROJECTOR = new RowProjector(Arrays.<ColumnProjector>asList(
+            new ExpressionProjector(EXPLAIN_PLAN_ALIAS, EXPLAIN_PLAN_TABLE_NAME, 
+                    new RowKeyColumnExpression(EXPLAIN_PLAN_DATUM,
+                            new RowKeyValueAccessor(Collections.<PDatum>singletonList(EXPLAIN_PLAN_DATUM), 0)), false)
+            ), 0, true);
+    private class ExecutableExplainStatement extends ExplainStatement implements ExecutableStatement {
+
+        public ExecutableExplainStatement(BindableStatement statement) {
+            super(statement);
+        }
+
+        @Override
+        public ExecutableStatement getStatement() {
+            return (ExecutableStatement) super.getStatement();
+        }
+        
+        @Override
+        public int getBindCount() {
+            return getStatement().getBindCount();
+        }
+
+        @Override
+        public PhoenixResultSet executeQuery() throws SQLException {
+            StatementPlan plan = getStatement().optimizePlan();
+            List<String> planSteps = plan.getExplainPlan().getPlanSteps();
+            List<Tuple> tuples = Lists.newArrayListWithExpectedSize(planSteps.size());
+            for (String planStep : planSteps) {
+                Tuple tuple = new SingleKeyValueTuple(KeyValueUtil.newKeyValue(PDataType.VARCHAR.toBytes(planStep), EXPLAIN_PLAN_FAMILY, EXPLAIN_PLAN_COLUMN, MetaDataProtocol.MIN_TABLE_TIMESTAMP, ByteUtil.EMPTY_BYTE_ARRAY));
+                tuples.add(tuple);
+            }
+            PhoenixResultSet rs = new PhoenixResultSet(new MaterializedResultIterator(tuples),EXPLAIN_PLAN_ROW_PROJECTOR, new PhoenixStatement(connection));
+            lastResultSet = rs;
+            lastQueryPlan = null;
+            lastUpdateCount = NO_UPDATE;
+            return rs;
+        }
+
+        @Override
+        public boolean execute() throws SQLException {
+            executeQuery();
+            return true;
+        }
+
+        @Override
+        public int executeUpdate() throws SQLException {
+            throw new ExecuteUpdateNotApplicableException("ALTER TABLE", this.toString());
+        }
+
+        @Override
+        public ResultSetMetaData getResultSetMetaData() throws SQLException {
+            return new PhoenixResultSetMetaData(connection, EXPLAIN_PLAN_ROW_PROJECTOR);
+        }
+
+        @Override
+        public StatementPlan compilePlan() throws SQLException {
+            return StatementPlan.EMPTY_PLAN;
+        }
+        
+        @Override
+        public StatementPlan optimizePlan() throws SQLException {
+            return compilePlan();
+        }
+    }
+
+    protected class ExecutableNodeFactory extends ParseNodeFactory {
+        @Override
+        public ExecutableSelectStatement select(List<? extends TableNode> from, HintNode hint, boolean isDistinct, List<AliasedNode> select,
+                                                ParseNode where, List<ParseNode> groupBy, ParseNode having,
+                                                List<OrderByNode> orderBy, LimitNode limit, int bindCount, boolean isAggregate) {
+            return new ExecutableSelectStatement(from, hint, isDistinct, select, where, groupBy == null ? Collections.<ParseNode>emptyList() : groupBy, having, orderBy == null ? Collections.<OrderByNode>emptyList() : orderBy, limit, bindCount, isAggregate);
+        }
+        
+        @Override
+        public ExecutableUpsertStatement upsert(NamedTableNode table, HintNode hintNode, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount) {
+            return new ExecutableUpsertStatement(table, hintNode, columns, values, select, bindCount);
+        }
+        
+        @Override
+        public ExecutableDeleteStatement delete(NamedTableNode table, HintNode hint, ParseNode whereNode, List<OrderByNode> orderBy, LimitNode limit, int bindCount) {
+            return new ExecutableDeleteStatement(table, hint, whereNode, orderBy, limit, bindCount);
+        }
+        
+        @Override
+        public CreateTableStatement createTable(TableName tableName, ListMultimap<String,Pair<String,Object>> props, List<ColumnDef> columns, PrimaryKeyConstraint pkConstraint,
+                List<ParseNode> splits, PTableType tableType, boolean ifNotExists, TableName baseTableName, ParseNode tableTypeIdNode, int bindCount) {
+            return new ExecutableCreateTableStatement(tableName, props, columns, pkConstraint, splits, tableType, ifNotExists, baseTableName, tableTypeIdNode, bindCount);
+        }
+        
+        @Override
+        public CreateSequenceStatement createSequence(TableName tableName, ParseNode startsWith, ParseNode incrementBy, ParseNode cacheSize, boolean ifNotExists, int bindCount){
+        	return new ExecutableCreateSequenceStatement(tableName, startsWith, incrementBy, cacheSize, ifNotExists, bindCount);
+        }
+        
+        @Override
+        public DropSequenceStatement dropSequence(TableName tableName, boolean ifExists, int bindCount){
+            return new ExecutableDropSequenceStatement(tableName, ifExists, bindCount);
+        }
+        
+        @Override
+        public CreateIndexStatement createIndex(NamedNode indexName, NamedTableNode dataTable, PrimaryKeyConstraint pkConstraint, List<ColumnName> includeColumns, List<ParseNode> splits, ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, int bindCount) {
+            return new ExecutableCreateIndexStatement(indexName, dataTable, pkConstraint, includeColumns, splits, props, ifNotExists, bindCount);
+        }
+        
+        @Override
+        public AddColumnStatement addColumn(NamedTableNode table,  PTableType tableType, List<ColumnDef> columnDefs, boolean ifNotExists, Map<String,Object> props) {
+            return new ExecutableAddColumnStatement(table, tableType, columnDefs, ifNotExists, props);
+        }
+        
+        @Override
+        public DropColumnStatement dropColumn(NamedTableNode table,  PTableType tableType, List<ColumnName> columnNodes, boolean ifExists) {
+            return new ExecutableDropColumnStatement(table, tableType, columnNodes, ifExists);
+        }
+        
+        @Override
+        public DropTableStatement dropTable(TableName tableName, PTableType tableType, boolean ifExists) {
+            return new ExecutableDropTableStatement(tableName, tableType, ifExists);
+        }
+        
+        @Override
+        public DropIndexStatement dropIndex(NamedNode indexName, TableName tableName, boolean ifExists) {
+            return new ExecutableDropIndexStatement(indexName, tableName, ifExists);
+        }
+        
+        @Override
+        public AlterIndexStatement alterIndex(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state) {
+            return new ExecutableAlterIndexStatement(indexTableNode, dataTableName, ifExists, state);
+        }
+        
+        @Override
+        public ExplainStatement explain(BindableStatement statement) {
+            return new ExecutableExplainStatement(statement);
+        }
+    }
+    
+    static class PhoenixStatementParser extends SQLParser {
+        PhoenixStatementParser(String query, ParseNodeFactory nodeFactory) throws IOException {
+            super(query, nodeFactory);
+        }
+
+        PhoenixStatementParser(Reader reader) throws IOException {
+            super(reader);
+        }
+        
+        @Override
+        public ExecutableStatement nextStatement(ParseNodeFactory nodeFactory) throws SQLException {
+            return (ExecutableStatement) super.nextStatement(nodeFactory);
+        }
+
+        @Override
+        public ExecutableStatement parseStatement() throws SQLException {
+            return (ExecutableStatement) super.parseStatement();
+        }
+    }
+    
+    public Format getFormatter(PDataType type) {
+        return connection.getFormatter(type);
+    }
+    
+    @Override
+    public void addBatch(String sql) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void cancel() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void clearBatch() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void clearWarnings() throws SQLException {
+    }
+
+    @Override
+    public void close() throws SQLException {
+        try {
+            List<PhoenixResultSet> resultSets = this.resultSets;
+            // Create new list so that remove of the PhoenixResultSet
+            // during closeAll doesn't needless do a linear search
+            // on this list.
+            this.resultSets = Lists.newArrayList();
+            SQLCloseables.closeAll(resultSets);
+        } finally {
+            try {
+                connection.removeStatement(this);
+            } finally {
+                isClosed = true;
+            }
+        }
+    }
+
+    public List<Object> getParameters() {
+        return Collections.<Object>emptyList();
+    }
+    
+    protected ExecutableStatement parseStatement(String sql) throws SQLException {
+        PhoenixStatementParser parser = null;
+        try {
+            parser = new PhoenixStatementParser(sql, new ExecutableNodeFactory());
+        } catch (IOException e) {
+            throw ServerUtil.parseServerException(e);
+        }
+        ExecutableStatement statement = parser.parseStatement();
+        return statement;
+    }
+    
+    @Override
+    public boolean execute(String sql) throws SQLException {
+        return parseStatement(sql).execute();
+    }
+
+    public QueryPlan optimizeQuery(String sql) throws SQLException {
+        return (QueryPlan)parseStatement(sql).optimizePlan();
+    }
+
+    public QueryPlan compileQuery(String sql) throws SQLException {
+        return (QueryPlan)parseStatement(sql).compilePlan();
+    }
+
+    @Override
+    public ResultSet executeQuery(String sql) throws SQLException {
+        return parseStatement(sql).executeQuery();
+    }
+
+    @Override
+    public int executeUpdate(String sql) throws SQLException {
+        return parseStatement(sql).executeUpdate();
+    }
+
+    @Override
+    public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public boolean execute(String sql, int[] columnIndexes) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public boolean execute(String sql, String[] columnNames) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public int[] executeBatch() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public int executeUpdate(String sql, String[] columnNames) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public PhoenixConnection getConnection() {
+        return connection;
+    }
+
+    @Override
+    public int getFetchDirection() throws SQLException {
+        return ResultSet.FETCH_FORWARD;
+    }
+
+    @Override
+    public int getFetchSize() throws SQLException {
+        return connection.getQueryServices().getProps().getInt(QueryServices.SCAN_CACHE_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_SCAN_CACHE_SIZE);
+    }
+
+    @Override
+    public ResultSet getGeneratedKeys() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public int getMaxFieldSize() throws SQLException {
+        return 0; // TODO: 4000?
+    }
+
+    @Override
+    public int getMaxRows() throws SQLException {
+        return maxRows;
+    }
+
+    @Override
+    public boolean getMoreResults() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public boolean getMoreResults(int current) throws SQLException {
+        return false;
+    }
+
+    @Override
+    public int getQueryTimeout() throws SQLException {
+        return connection.getQueryServices().getProps().getInt(QueryServices.KEEP_ALIVE_MS_ATTRIB, 0) / 1000;
+    }
+
+    // For testing
+    public QueryPlan getQueryPlan() {
+        return lastQueryPlan;
+    }
+    
+    @Override
+    public ResultSet getResultSet() throws SQLException {
+        ResultSet rs = lastResultSet;
+        lastResultSet = null;
+        return rs;
+    }
+
+    @Override
+    public int getResultSetConcurrency() throws SQLException {
+        return ResultSet.CONCUR_READ_ONLY;
+    }
+
+    @Override
+    public int getResultSetHoldability() throws SQLException {
+        // TODO: not sure this matters
+        return ResultSet.CLOSE_CURSORS_AT_COMMIT;
+    }
+
+    @Override
+    public int getResultSetType() throws SQLException {
+        return ResultSet.TYPE_FORWARD_ONLY;
+    }
+
+    public UpdateOperation getUpdateOperation() {
+        return lastUpdateOperation;
+    }
+    
+    @Override
+    public int getUpdateCount() throws SQLException {
+        int updateCount = lastUpdateCount;
+        // Only first call can get the update count, otherwise
+        // some SQL clients get into an infinite loop when an
+        // update occurs.
+        lastUpdateCount = NO_UPDATE;
+        return updateCount;
+    }
+
+    @Override
+    public SQLWarning getWarnings() throws SQLException {
+        return null;
+    }
+
+    @Override
+    public boolean isClosed() throws SQLException {
+        return isClosed;
+    }
+
+    @Override
+    public boolean isPoolable() throws SQLException {
+        return false;
+    }
+
+    @Override
+    public void setCursorName(String name) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void setEscapeProcessing(boolean enable) throws SQLException {
+        // TODO: any escaping we need to do?
+    }
+
+    @Override
+    public void setFetchDirection(int direction) throws SQLException {
+        if (direction != ResultSet.FETCH_FORWARD) {
+            throw new SQLFeatureNotSupportedException();
+        }
+    }
+
+    @Override
+    public void setFetchSize(int rows) throws SQLException {
+        // TODO: map to Scan.setBatch() ?
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void setMaxFieldSize(int max) throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public void setMaxRows(int max) throws SQLException {
+        this.maxRows = max;
+    }
+
+    @Override
+    public void setPoolable(boolean poolable) throws SQLException {
+        if (poolable) {
+            throw new SQLFeatureNotSupportedException();
+        }
+    }
+
+    @Override
+    public void setQueryTimeout(int seconds) throws SQLException {
+        // The Phoenix setting for this is shared across all connections currently
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public boolean isWrapperFor(Class<?> iface) throws SQLException {
+        return iface.isInstance(this);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> T unwrap(Class<T> iface) throws SQLException {
+        if (!iface.isInstance(this)) {
+            throw new SQLExceptionInfo.Builder(SQLExceptionCode.CLASS_NOT_UNWRAPPABLE)
+                .setMessage(this.getClass().getName() + " not unwrappable from " + iface.getName())
+                .build().buildException();
+        }
+        return (T)this;
+    }
+
+    @Override
+    public void closeOnCompletion() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+
+    @Override
+    public boolean isCloseOnCompletion() throws SQLException {
+        throw new SQLFeatureNotSupportedException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatementFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatementFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatementFactory.java
new file mode 100644
index 0000000..5081049
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatementFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+public interface PhoenixStatementFactory {
+    public PhoenixStatement newStatement(PhoenixConnection connection);
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/job/AbstractRoundRobinQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/job/AbstractRoundRobinQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/job/AbstractRoundRobinQueue.java
new file mode 100644
index 0000000..e77e4ca
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/job/AbstractRoundRobinQueue.java
@@ -0,0 +1,314 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.job;
+
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ * An bounded blocking queue implementation that keeps a virtual queue of elements on per-producer
+ * basis and iterates through each producer queue in round robin fashion.
+ *
+ */
+public abstract class AbstractRoundRobinQueue<E> extends AbstractQueue<E>
+        implements BlockingQueue<E>{
+
+    /**
+     * Construct an AbstractBlockingRoundRobinQueue that limits the size of the queued elements
+     * to at most maxSize. Attempts to insert new elements after that point will cause the
+     * caller to block.
+     * @param maxSize
+     */
+    public AbstractRoundRobinQueue(int maxSize) {
+        this(maxSize, false);
+    }
+    /**
+     * @param newProducerToFront If true, new producers go to the front of the round-robin list, if false, they go to the end.
+     */
+    public AbstractRoundRobinQueue(int maxSize, boolean newProducerToFront) {
+        this.producerMap = new HashMap<Object,ProducerList<E>>();
+        this.producerLists = new LinkedList<ProducerList<E>>();
+        this.lock = new Object();
+        this.newProducerToFront = newProducerToFront;
+        this.maxSize = maxSize;
+    }
+
+    @Override
+    public Iterator<E> iterator() {
+        synchronized(lock) {
+            ArrayList<E> allElements = new ArrayList<E>(this.size);
+            ListIterator<ProducerList<E>> iter = this.producerLists.listIterator(this.currentProducer);
+            while(iter.hasNext()) {
+                ProducerList<E> tList = iter.next();
+                allElements.addAll(tList.list);
+            }
+            return allElements.iterator();
+        }
+    }
+
+    @Override
+    public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
+        boolean taken = false;
+        long endAt = System.currentTimeMillis() + unit.toMillis(timeout);
+        synchronized(lock) {
+            long waitTime = endAt - System.currentTimeMillis();
+            while (!(taken = offer(o)) && waitTime > 0) {
+                this.lock.wait(waitTime);
+                waitTime = endAt - System.currentTimeMillis();
+            }
+        }
+        return taken;
+    }
+
+    @Override
+    public boolean offer(E o) {
+        if (o == null)
+            throw new NullPointerException();
+
+        final Object producerKey = extractProducer(o);
+
+        ProducerList<E> producerList = null;
+        synchronized(lock) {
+            if (this.size == this.maxSize) {
+                return false;
+            }
+            producerList = this.producerMap.get(producerKey);
+            if (producerList == null) {
+                producerList = new ProducerList<E>(producerKey);
+                this.producerMap.put(producerKey, producerList);
+                this.producerLists.add(this.currentProducer, producerList);
+                if (!this.newProducerToFront) {
+                    incrementCurrentProducerPointer();
+                }
+            }
+            producerList.list.add(o);
+            this.size++;
+            lock.notifyAll();
+        }
+        return true;
+    }
+    
+    /**
+     * Implementations must extracts the producer object which is used as the key to identify a unique producer.
+     */
+    protected abstract Object extractProducer(E o);
+
+    @Override
+    public void put(E o) {
+        offer(o);
+    }
+
+    @Override
+    public E take() throws InterruptedException {
+        synchronized(lock) {
+            while (this.size == 0) {
+                this.lock.wait();
+            }
+            E element = poll();
+            assert element != null;
+            return element;
+        }
+    }
+
+    @Override
+    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+        long endAt = System.currentTimeMillis() + unit.toMillis(timeout);
+        synchronized(lock) {
+            long waitTime = endAt - System.currentTimeMillis();
+            while (this.size == 0 && waitTime > 0) {
+                this.lock.wait(waitTime);
+                waitTime = endAt - System.currentTimeMillis();
+            }
+            return poll();
+        }
+    }
+
+    @Override
+    public E poll() {
+        synchronized(lock) {
+            ListIterator<ProducerList<E>> iter = this.producerLists.listIterator(this.currentProducer);
+            while (iter.hasNext()) {
+                ProducerList<E> tList = iter.next();
+                if (tList.list.isEmpty()) {
+                    iter.remove();
+                    this.producerMap.remove(tList.producer);
+                    adjustCurrentProducerPointer();
+                } else {
+                    E element = tList.list.removeFirst();
+                    this.size--;
+                    assert element != null;
+                    // This is the round robin part. When we take an element from the current thread's queue
+                    // we move on to the next thread.
+                    if (tList.list.isEmpty()) {
+                        iter.remove();
+                        this.producerMap.remove(tList.producer);
+                        adjustCurrentProducerPointer();
+                    } else {
+                        incrementCurrentProducerPointer();
+                    }
+                    lock.notifyAll();
+                    return element;
+                }
+            }
+            assert this.size == 0;
+        }
+        return null;
+    }
+
+    /**
+     * Polls using the given producer key.
+     */
+    protected E pollProducer(Object producer) {
+        synchronized(lock) {
+            ProducerList<E> tList = this.producerMap.get(producer);
+            if (tList != null && !tList.list.isEmpty()) {
+                E element = tList.list.removeFirst();
+                this.size--;
+                if (tList.list.isEmpty()) {
+                    this.producerLists.remove(tList);
+                    this.producerMap.remove(tList.producer);
+                    // we need to adjust the current thread pointer in case it pointed to this thread list, which is now removed
+                    adjustCurrentProducerPointer();
+                }
+                lock.notifyAll();
+                assert element != null;
+                // Since this is only processing the current thread's work, we'll leave the
+                // round-robin part alone and just return the work
+                return element;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public E peek() {
+        synchronized(lock) {
+            ListIterator<ProducerList<E>> iter = this.producerLists.listIterator(this.currentProducer);
+            while (iter.hasNext()) {
+                ProducerList<E> tList = iter.next();
+                if (tList.list.isEmpty()) {
+                    iter.remove();
+                    this.producerMap.remove(tList.producer);
+                    adjustCurrentProducerPointer();
+                } else {
+                    E element = tList.list.getFirst();
+                    assert element != null;
+                    return element;
+                }
+            }
+            assert this.size == 0;
+        }
+        return null;
+    }
+
+    @Override
+    public int drainTo(Collection<? super E> c) {
+        if (c == null)
+            throw new NullPointerException();
+        if (c == this)
+            throw new IllegalArgumentException();
+
+        synchronized(this.lock) {
+            int originalSize = this.size;
+            int drained = drainTo(c, this.size);
+            assert drained == originalSize;
+            assert this.size == 0;
+            assert this.producerLists.isEmpty();
+            assert this.producerMap.isEmpty();
+            return drained;
+        }
+    }
+
+    @Override
+    public int drainTo(Collection<? super E> c, int maxElements) {
+        if (c == null)
+            throw new NullPointerException();
+        if (c == this)
+            throw new IllegalArgumentException();
+
+        synchronized(this.lock) {
+            int i = 0;
+            while(i < maxElements) {
+                E element = poll();
+                if (element != null) {
+                    c.add(element);
+                    i++;
+                } else {
+                    break;
+                }
+            }
+            return i;
+        }
+    }
+
+    @Override
+    public int remainingCapacity() {
+        return Integer.MAX_VALUE;
+    }
+
+    @Override
+    public int size() {
+        synchronized(this.lock) {
+            return this.size;
+        }
+    }
+    
+    private void incrementCurrentProducerPointer() {
+        synchronized(lock) {
+            if (this.producerLists.size() == 0) {
+                this.currentProducer = 0;
+            } else {
+                this.currentProducer = (this.currentProducer+1)%this.producerLists.size();
+            }
+        }
+    }
+    
+    /**
+     * Adjusts the current pointer to a decrease in size.
+     */
+    private void adjustCurrentProducerPointer() {
+        synchronized(lock) {
+            if (this.producerLists.size() == 0) {
+                this.currentProducer = 0;
+            } else {
+                this.currentProducer = (this.currentProducer)%this.producerLists.size();
+            }
+        }
+    }
+
+    private static class ProducerList<E> {
+        public ProducerList(Object producer) {
+            this.producer = producer;
+            this.list = new LinkedList<E>();
+        }
+        private final Object producer;
+        private final LinkedList<E> list;
+    }
+
+    private final Map<Object,ProducerList<E>> producerMap;
+    private final LinkedList<ProducerList<E>> producerLists;
+    private final Object lock;
+    private final boolean newProducerToFront;
+    private int currentProducer;
+    private int size;
+    private int maxSize;
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java b/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java
new file mode 100644
index 0000000..7d17a6d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.job;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * 
+ * Thread pool executor that executes scans in parallel
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+@SuppressWarnings("rawtypes")
+public class JobManager<T> extends AbstractRoundRobinQueue<T> {
+	
+    private static final AtomicLong PHOENIX_POOL_INDEX = new AtomicLong(1);
+	
+    public JobManager(int maxSize) {
+        super(maxSize, true); // true -> new producers move to front of queue; this reduces latency.
+    }
+
+	@Override
+    protected Object extractProducer(T o) {
+        return ((JobFutureTask)o).getJobId();
+    }        
+
+    public static interface JobRunnable<T> extends Runnable {
+        public Object getJobId();
+    }
+
+    public static ThreadPoolExecutor createThreadPoolExec(int keepAliveMs, int size, int queueSize) {
+        BlockingQueue<Runnable> queue;
+        if (queueSize == 0) {
+            queue = new SynchronousQueue<Runnable>(); // Specialized for 0 length.
+        } else {
+            queue = new JobManager<Runnable>(queueSize);
+        }
+        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(
+				"phoenix-" + PHOENIX_POOL_INDEX.getAndIncrement()
+						+ "-thread-%s").build();
+        // For thread pool, set core threads = max threads -- we don't ever want to exceed core threads, but want to go up to core threads *before* using the queue.
+        ThreadPoolExecutor exec = new ThreadPoolExecutor(size, size, keepAliveMs, TimeUnit.MILLISECONDS, queue, threadFactory) {
+            @Override
+            protected <T> RunnableFuture<T> newTaskFor(Callable<T> call) {
+                // Override this so we can create a JobFutureTask so we can extract out the parentJobId (otherwise, in the default FutureTask, it is private). 
+                return new JobFutureTask<T>(call);
+            }
+    
+            @Override
+            protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+                return new JobFutureTask<T>((JobRunnable)runnable, value);
+            }
+            
+        };
+        
+        exec.allowCoreThreadTimeOut(true); // ... and allow core threads to time out.  This just keeps things clean when idle, and is nice for ftests modes, etc., where we'd especially like these not to linger.
+        return exec;
+    }
+
+    /**
+     * Subclasses FutureTask for the sole purpose of providing {@link #getCallable()}, which is used to extract the producer in the {@link JobBasedRoundRobinQueue}
+     */
+    static class JobFutureTask<T> extends FutureTask<T> {
+        private final Object jobId;
+        
+        public JobFutureTask(JobRunnable r, T t) {
+            super(r, t);
+            this.jobId = r.getJobId();
+        }
+        
+        public JobFutureTask(Callable<T> c) {
+            super(c);
+            // FIXME: this fails when executor used by hbase
+            if (c instanceof JobCallable) {
+                this.jobId = ((JobCallable<T>) c).getJobId();
+            } else {
+                this.jobId = this;
+            }
+        }
+        
+        public Object getJobId() {
+            return jobId;
+        }
+    }
+
+
+    /**
+     * Delegating callable implementation that preserves the parentJobId and sets up thread tracker stuff before delegating to the actual command. 
+     */
+    public static interface JobCallable<T> extends Callable<T> {
+        public Object getJobId();
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
new file mode 100644
index 0000000..c374618
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.join;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+import org.xerial.snappy.Snappy;
+
+import org.apache.phoenix.cache.ServerCacheClient;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * Client for adding cache of one side of a join to region servers
+ * 
+ * @author jtaylor
+ * @since 0.1
+ */
+public class HashCacheClient  {
+    private final ServerCacheClient serverCache;
+    /**
+     * Construct client used to create a serialized cached snapshot of a table and send it to each region server
+     * for caching during hash join processing.
+     * @param connection the client connection
+     */
+    public HashCacheClient(PhoenixConnection connection) {
+        serverCache = new ServerCacheClient(connection);
+    }
+
+    /**
+     * Send the results of scanning through the scanner to all
+     * region servers for regions of the table that will use the cache
+     * that intersect with the minMaxKeyRange.
+     * @param scanner scanner for the table or intermediate results being cached
+     * @return client-side {@link ServerCache} representing the added hash cache
+     * @throws SQLException 
+     * @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed
+     * size
+     */
+    public ServerCache addHashCache(ScanRanges keyRanges, ResultIterator iterator, long estimatedSize, List<Expression> onExpressions, TableRef cacheUsingTableRef) throws SQLException {
+        /**
+         * Serialize and compress hashCacheTable
+         */
+        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+        serialize(ptr, iterator, estimatedSize, onExpressions);
+        return serverCache.addServerCache(keyRanges, ptr, new HashCacheFactory(), cacheUsingTableRef);
+    }
+    
+    private void serialize(ImmutableBytesWritable ptr, ResultIterator iterator, long estimatedSize, List<Expression> onExpressions) throws SQLException {
+        long maxSize = serverCache.getConnection().getQueryServices().getProps().getLong(QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_SIZE);
+        estimatedSize = Math.min(estimatedSize, maxSize);
+        if (estimatedSize > Integer.MAX_VALUE) {
+            throw new IllegalStateException("Estimated size(" + estimatedSize + ") must not be greater than Integer.MAX_VALUE(" + Integer.MAX_VALUE + ")");
+        }
+        try {
+            TrustedByteArrayOutputStream baOut = new TrustedByteArrayOutputStream((int)estimatedSize);
+            DataOutputStream out = new DataOutputStream(baOut);
+            // Write onExpressions first, for hash key evaluation along with deserialization
+            out.writeInt(onExpressions.size());
+            for (Expression expression : onExpressions) {
+                WritableUtils.writeVInt(out, ExpressionType.valueOf(expression).ordinal());
+                expression.write(out);                
+            }
+            int exprSize = baOut.size() + Bytes.SIZEOF_INT;
+            out.writeInt(exprSize);
+            int nRows = 0;
+            out.writeInt(nRows); // In the end will be replaced with total number of rows            
+            for (Tuple result = iterator.next(); result != null; result = iterator.next()) {
+                TupleUtil.write(result, out);
+                if (baOut.size() > maxSize) {
+                    throw new MaxServerCacheSizeExceededException("Size of hash cache (" + baOut.size() + " bytes) exceeds the maximum allowed size (" + maxSize + " bytes)");
+                }
+                nRows++;
+            }
+            TrustedByteArrayOutputStream sizeOut = new TrustedByteArrayOutputStream(Bytes.SIZEOF_INT);
+            DataOutputStream dataOut = new DataOutputStream(sizeOut);
+            try {
+                dataOut.writeInt(nRows);
+                dataOut.flush();
+                byte[] cache = baOut.getBuffer();
+                // Replace number of rows written above with the correct value.
+                System.arraycopy(sizeOut.getBuffer(), 0, cache, exprSize, sizeOut.size());
+                // Reallocate to actual size plus compressed buffer size (which is allocated below)
+                int maxCompressedSize = Snappy.maxCompressedLength(baOut.size());
+                byte[] compressed = new byte[maxCompressedSize]; // size for worst case
+                int compressedSize = Snappy.compress(baOut.getBuffer(), 0, baOut.size(), compressed, 0);
+                // Last realloc to size of compressed buffer.
+                ptr.set(compressed,0,compressedSize);
+            } finally {
+                dataOut.close();
+            }
+        } catch (IOException e) {
+            throw ServerUtil.parseServerException(e);
+        } finally {
+            iterator.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
new file mode 100644
index 0000000..6c3f69b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.join;
+
+import java.io.*;
+import java.sql.SQLException;
+import java.util.*;
+
+import net.jcip.annotations.Immutable;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+import org.xerial.snappy.Snappy;
+
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.cache.HashCache;
+import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.*;
+
+public class HashCacheFactory implements ServerCacheFactory {
+
+    public HashCacheFactory() {
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+    }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+    }
+
+    @Override
+    public Closeable newCache(ImmutableBytesWritable cachePtr, MemoryChunk chunk) throws SQLException {
+        try {
+            int size = Snappy.uncompressedLength(cachePtr.get());
+            byte[] uncompressed = new byte[size];
+            Snappy.uncompress(cachePtr.get(), 0, cachePtr.getLength(), uncompressed, 0);
+            return new HashCacheImpl(uncompressed, chunk);
+        } catch (IOException e) {
+            throw ServerUtil.parseServerException(e);
+        }
+    }
+
+    @Immutable
+    private class HashCacheImpl implements HashCache {
+        private final Map<ImmutableBytesPtr,List<Tuple>> hashCache;
+        private final MemoryChunk memoryChunk;
+        
+        private HashCacheImpl(byte[] hashCacheBytes, MemoryChunk memoryChunk) {
+            try {
+                this.memoryChunk = memoryChunk;
+                byte[] hashCacheByteArray = hashCacheBytes;
+                int offset = 0;
+                ByteArrayInputStream input = new ByteArrayInputStream(hashCacheByteArray, offset, hashCacheBytes.length);
+                DataInputStream dataInput = new DataInputStream(input);
+                int nExprs = dataInput.readInt();
+                List<Expression> onExpressions = new ArrayList<Expression>(nExprs);
+                for (int i = 0; i < nExprs; i++) {
+                    int expressionOrdinal = WritableUtils.readVInt(dataInput);
+                    Expression expression = ExpressionType.values()[expressionOrdinal].newInstance();
+                    expression.readFields(dataInput);
+                    onExpressions.add(expression);                        
+                }
+                int exprSize = dataInput.readInt();
+                offset += exprSize;
+                int nRows = dataInput.readInt();
+                int estimatedSize = SizedUtil.sizeOfMap(nRows, SizedUtil.IMMUTABLE_BYTES_WRITABLE_SIZE, SizedUtil.RESULT_SIZE) + hashCacheBytes.length;
+                this.memoryChunk.resize(estimatedSize);
+                HashMap<ImmutableBytesPtr,List<Tuple>> hashCacheMap = new HashMap<ImmutableBytesPtr,List<Tuple>>(nRows * 5 / 4);
+                offset += Bytes.SIZEOF_INT;
+                // Build Map with evaluated hash key as key and row as value
+                for (int i = 0; i < nRows; i++) {
+                    int resultSize = (int)Bytes.readVLong(hashCacheByteArray, offset);
+                    offset += WritableUtils.decodeVIntSize(hashCacheByteArray[offset]);
+                    ImmutableBytesWritable value = new ImmutableBytesWritable(hashCacheByteArray,offset,resultSize);
+                    Tuple result = new ResultTuple(new Result(value));
+                    ImmutableBytesPtr key = TupleUtil.getConcatenatedValue(result, onExpressions);
+                    List<Tuple> tuples = hashCacheMap.get(key);
+                    if (tuples == null) {
+                        tuples = new ArrayList<Tuple>(1);
+                        hashCacheMap.put(key, tuples);
+                    }
+                    tuples.add(result);
+                    offset += resultSize;
+                }
+                this.hashCache = Collections.unmodifiableMap(hashCacheMap);
+            } catch (IOException e) { // Not possible with ByteArrayInputStream
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public void close() {
+            memoryChunk.close();
+        }
+        
+        @Override
+        public List<Tuple> get(ImmutableBytesPtr hashKey) {
+            return hashCache.get(hashKey);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
new file mode 100644
index 0000000..efeb717
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
@@ -0,0 +1,210 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.join;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.io.WritableUtils;
+
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+import org.apache.phoenix.parse.JoinTableNode.JoinType;
+import org.apache.phoenix.schema.KeyValueSchema;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
+import org.apache.phoenix.util.SchemaUtil;
+
+public class HashJoinInfo {
+    private static final String HASH_JOIN = "HashJoin";
+    
+    private KeyValueSchema joinedSchema;
+    private ImmutableBytesPtr[] joinIds;
+    private List<Expression>[] joinExpressions;
+    private JoinType[] joinTypes;
+    private boolean[] earlyEvaluation;
+    private KeyValueSchema[] schemas;
+    private int[] fieldPositions;
+    private Expression postJoinFilterExpression;
+    
+    public HashJoinInfo(PTable joinedTable, ImmutableBytesPtr[] joinIds, List<Expression>[] joinExpressions, JoinType[] joinTypes, boolean[] earlyEvaluation, PTable[] tables, int[] fieldPositions, Expression postJoinFilterExpression) {
+    	this(buildSchema(joinedTable), joinIds, joinExpressions, joinTypes, earlyEvaluation, buildSchemas(tables), fieldPositions, postJoinFilterExpression);
+    }
+    
+    private static KeyValueSchema[] buildSchemas(PTable[] tables) {
+    	KeyValueSchema[] schemas = new KeyValueSchema[tables.length];
+    	for (int i = 0; i < tables.length; i++) {
+    		schemas[i] = buildSchema(tables[i]);
+    	}
+    	return schemas;
+    }
+    
+    private static KeyValueSchema buildSchema(PTable table) {
+    	KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
+    	if (table != null) {
+    	    for (PColumn column : table.getColumns()) {
+    	        if (!SchemaUtil.isPKColumn(column)) {
+    	            builder.addField(column);
+    	        }
+    	    }
+    	}
+        return builder.build();
+    }
+    
+    private HashJoinInfo(KeyValueSchema joinedSchema, ImmutableBytesPtr[] joinIds, List<Expression>[] joinExpressions, JoinType[] joinTypes, boolean[] earlyEvaluation, KeyValueSchema[] schemas, int[] fieldPositions, Expression postJoinFilterExpression) {
+    	this.joinedSchema = joinedSchema;
+    	this.joinIds = joinIds;
+        this.joinExpressions = joinExpressions;
+        this.joinTypes = joinTypes;
+        this.earlyEvaluation = earlyEvaluation;
+        this.schemas = schemas;
+        this.fieldPositions = fieldPositions;
+        this.postJoinFilterExpression = postJoinFilterExpression;
+    }
+    
+    public KeyValueSchema getJoinedSchema() {
+    	return joinedSchema;
+    }
+    
+    public ImmutableBytesPtr[] getJoinIds() {
+        return joinIds;
+    }
+    
+    public List<Expression>[] getJoinExpressions() {
+        return joinExpressions;
+    }
+    
+    public JoinType[] getJoinTypes() {
+        return joinTypes;
+    }
+    
+    public boolean[] earlyEvaluation() {
+    	return earlyEvaluation;
+    }
+    
+    public KeyValueSchema[] getSchemas() {
+    	return schemas;
+    }
+    
+    public int[] getFieldPositions() {
+    	return fieldPositions;
+    }
+    
+    public Expression getPostJoinFilterExpression() {
+        return postJoinFilterExpression;
+    }
+    
+    public static void serializeHashJoinIntoScan(Scan scan, HashJoinInfo joinInfo) {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        try {
+            DataOutputStream output = new DataOutputStream(stream);
+            joinInfo.joinedSchema.write(output);
+            int count = joinInfo.joinIds.length;
+            WritableUtils.writeVInt(output, count);
+            for (int i = 0; i < count; i++) {
+                joinInfo.joinIds[i].write(output);
+                WritableUtils.writeVInt(output, joinInfo.joinExpressions[i].size());
+                for (Expression expr : joinInfo.joinExpressions[i]) {
+                    WritableUtils.writeVInt(output, ExpressionType.valueOf(expr).ordinal());
+                    expr.write(output);
+                }
+                WritableUtils.writeVInt(output, joinInfo.joinTypes[i].ordinal());
+                output.writeBoolean(joinInfo.earlyEvaluation[i]);
+                joinInfo.schemas[i].write(output);
+                WritableUtils.writeVInt(output, joinInfo.fieldPositions[i]);
+            }
+            if (joinInfo.postJoinFilterExpression != null) {
+                WritableUtils.writeVInt(output, ExpressionType.valueOf(joinInfo.postJoinFilterExpression).ordinal());
+                joinInfo.postJoinFilterExpression.write(output);
+            } else {
+                WritableUtils.writeVInt(output, -1);
+            }
+            scan.setAttribute(HASH_JOIN, stream.toByteArray());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        
+    }
+    
+    @SuppressWarnings("unchecked")
+    public static HashJoinInfo deserializeHashJoinFromScan(Scan scan) {
+        byte[] join = scan.getAttribute(HASH_JOIN);
+        if (join == null) {
+            return null;
+        }
+        ByteArrayInputStream stream = new ByteArrayInputStream(join);
+        try {
+            DataInputStream input = new DataInputStream(stream);
+            KeyValueSchema joinedSchema = new KeyValueSchema();
+            joinedSchema.readFields(input);
+            int count = WritableUtils.readVInt(input);
+            ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[count];
+            List<Expression>[] joinExpressions = new List[count];
+            JoinType[] joinTypes = new JoinType[count];
+            boolean[] earlyEvaluation = new boolean[count];
+            KeyValueSchema[] schemas = new KeyValueSchema[count];
+            int[] fieldPositions = new int[count];
+            for (int i = 0; i < count; i++) {
+                joinIds[i] = new ImmutableBytesPtr();
+                joinIds[i].readFields(input);
+                int nExprs = WritableUtils.readVInt(input);
+                joinExpressions[i] = new ArrayList<Expression>(nExprs);
+                for (int j = 0; j < nExprs; j++) {
+                    int expressionOrdinal = WritableUtils.readVInt(input);
+                    Expression expression = ExpressionType.values()[expressionOrdinal].newInstance();
+                    expression.readFields(input);
+                    joinExpressions[i].add(expression);                    
+                }
+                int type = WritableUtils.readVInt(input);
+                joinTypes[i] = JoinType.values()[type];
+                earlyEvaluation[i] = input.readBoolean();
+                schemas[i] = new KeyValueSchema();
+                schemas[i].readFields(input);
+                fieldPositions[i] = WritableUtils.readVInt(input);
+            }
+            Expression postJoinFilterExpression = null;
+            int expressionOrdinal = WritableUtils.readVInt(input);
+            if (expressionOrdinal != -1) {
+                postJoinFilterExpression = ExpressionType.values()[expressionOrdinal].newInstance();
+                postJoinFilterExpression.readFields(input);
+            }
+            return new HashJoinInfo(joinedSchema, joinIds, joinExpressions, joinTypes, earlyEvaluation, schemas, fieldPositions, postJoinFilterExpression);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+}