You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 23:15:37 UTC
[21/51] [partial] Initial commit of master branch from github
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
new file mode 100644
index 0000000..8592f49
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -0,0 +1,1258 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.sql.ParameterMetaData;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLWarning;
+import java.sql.Statement;
+import java.text.Format;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import org.apache.phoenix.compile.ColumnProjector;
+import org.apache.phoenix.compile.CreateIndexCompiler;
+import org.apache.phoenix.compile.CreateSequenceCompiler;
+import org.apache.phoenix.compile.CreateTableCompiler;
+import org.apache.phoenix.compile.DeleteCompiler;
+import org.apache.phoenix.compile.DropSequenceCompiler;
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.ExpressionProjector;
+import org.apache.phoenix.compile.MutationPlan;
+import org.apache.phoenix.compile.QueryCompiler;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementPlan;
+import org.apache.phoenix.compile.UpsertCompiler;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.expression.RowKeyColumnExpression;
+import org.apache.phoenix.iterate.MaterializedResultIterator;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.parse.AddColumnStatement;
+import org.apache.phoenix.parse.AliasedNode;
+import org.apache.phoenix.parse.AlterIndexStatement;
+import org.apache.phoenix.parse.BindableStatement;
+import org.apache.phoenix.parse.ColumnDef;
+import org.apache.phoenix.parse.ColumnName;
+import org.apache.phoenix.parse.CreateIndexStatement;
+import org.apache.phoenix.parse.CreateSequenceStatement;
+import org.apache.phoenix.parse.CreateTableStatement;
+import org.apache.phoenix.parse.DeleteStatement;
+import org.apache.phoenix.parse.DropColumnStatement;
+import org.apache.phoenix.parse.DropIndexStatement;
+import org.apache.phoenix.parse.DropSequenceStatement;
+import org.apache.phoenix.parse.DropTableStatement;
+import org.apache.phoenix.parse.ExplainStatement;
+import org.apache.phoenix.parse.HintNode;
+import org.apache.phoenix.parse.LimitNode;
+import org.apache.phoenix.parse.NamedNode;
+import org.apache.phoenix.parse.NamedTableNode;
+import org.apache.phoenix.parse.OrderByNode;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.parse.ParseNodeFactory;
+import org.apache.phoenix.parse.PrimaryKeyConstraint;
+import org.apache.phoenix.parse.SQLParser;
+import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.parse.TableName;
+import org.apache.phoenix.parse.TableNode;
+import org.apache.phoenix.parse.UpsertStatement;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.ColumnModifier;
+import org.apache.phoenix.schema.ExecuteQueryNotApplicableException;
+import org.apache.phoenix.schema.ExecuteUpdateNotApplicableException;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PDatum;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.RowKeyValueAccessor;
+import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.SQLCloseable;
+import org.apache.phoenix.util.SQLCloseables;
+import org.apache.phoenix.util.ServerUtil;
+
+
+/**
+ *
+ * JDBC Statement implementation of Phoenix.
+ * Currently only the following methods are supported:
+ * - {@link #executeQuery(String)}
+ * - {@link #executeUpdate(String)}
+ * - {@link #execute(String)}
+ * - {@link #getResultSet()}
+ * - {@link #getUpdateCount()}
+ * - {@link #close()}
+ * The Statement only supports the following options:
+ * - ResultSet.FETCH_FORWARD
+ * - ResultSet.TYPE_FORWARD_ONLY
+ * - ResultSet.CLOSE_CURSORS_AT_COMMIT
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class PhoenixStatement implements Statement, SQLCloseable, org.apache.phoenix.jdbc.Jdbc7Shim.Statement {
+ public enum UpdateOperation {
+ DELETED("deleted"),
+ UPSERTED("upserted");
+
+ private final String toString;
+ UpdateOperation(String toString) {
+ this.toString = toString;
+ }
+
+ @Override
+ public String toString() {
+ return toString;
+ }
+ };
+
+ protected final PhoenixConnection connection;
+ private static final int NO_UPDATE = -1;
+ private List<PhoenixResultSet> resultSets = new ArrayList<PhoenixResultSet>();
+ private QueryPlan lastQueryPlan;
+ private PhoenixResultSet lastResultSet;
+ private int lastUpdateCount = NO_UPDATE;
+ private UpdateOperation lastUpdateOperation;
+ private boolean isClosed = false;
+ private ResultSetMetaData resultSetMetaData;
+ private int maxRows;
+
+
+ public PhoenixStatement(PhoenixConnection connection) {
+ this.connection = connection;
+ }
+
+ protected List<PhoenixResultSet> getResultSets() {
+ return resultSets;
+ }
+
+ protected PhoenixResultSet newResultSet(ResultIterator iterator, RowProjector projector) throws SQLException {
+ return new PhoenixResultSet(iterator, projector, PhoenixStatement.this);
+ }
+
+ protected static interface ExecutableStatement extends BindableStatement {
+ public boolean execute() throws SQLException;
+ public int executeUpdate() throws SQLException;
+ public PhoenixResultSet executeQuery() throws SQLException;
+ public ResultSetMetaData getResultSetMetaData() throws SQLException;
+ public StatementPlan optimizePlan() throws SQLException;
+ public StatementPlan compilePlan() throws SQLException;
+ }
+
+ protected static interface MutatableStatement extends ExecutableStatement {
+ @Override
+ public MutationPlan optimizePlan() throws SQLException;
+ }
+
+ private class ExecutableSelectStatement extends SelectStatement implements ExecutableStatement {
+ private ExecutableSelectStatement(List<? extends TableNode> from, HintNode hint, boolean isDistinct, List<AliasedNode> select, ParseNode where,
+ List<ParseNode> groupBy, ParseNode having, List<OrderByNode> orderBy, LimitNode limit, int bindCount, boolean isAggregate) {
+ super(from, hint, isDistinct, select, where, groupBy, having, orderBy, limit, bindCount, isAggregate);
+ }
+
+ @Override
+ public PhoenixResultSet executeQuery() throws SQLException {
+ QueryPlan plan = optimizePlan();
+ PhoenixResultSet rs = newResultSet(plan.iterator(), plan.getProjector());
+ resultSets.add(rs);
+ lastResultSet = rs;
+ lastUpdateCount = NO_UPDATE;
+ lastUpdateOperation = null;
+ return rs;
+ }
+
+ @Override
+ public boolean execute() throws SQLException {
+ executeQuery();
+ return true;
+ }
+
+ @Override
+ public int executeUpdate() throws SQLException {
+ throw new ExecuteUpdateNotApplicableException(this.toString());
+ }
+
+ @Override
+ public QueryPlan optimizePlan() throws SQLException {
+ return lastQueryPlan = connection.getQueryServices().getOptimizer().optimize(this, PhoenixStatement.this);
+ }
+
+ @Override
+ public StatementPlan compilePlan() throws SQLException {
+ return new QueryCompiler(PhoenixStatement.this).compile(this);
+ }
+
+ @Override
+ public ResultSetMetaData getResultSetMetaData() throws SQLException {
+ if (resultSetMetaData == null) {
+ // Just compile top level query without optimizing to get ResultSetMetaData
+ QueryPlan plan = new QueryCompiler(PhoenixStatement.this).compile(this);
+ resultSetMetaData = new PhoenixResultSetMetaData(connection, plan.getProjector());
+ }
+ return resultSetMetaData;
+ }
+ }
+
+ private int executeMutation(MutationPlan plan) throws SQLException {
+ // Note that the upsert select statements will need to commit any open transaction here,
+ // since they'd update data directly from coprocessors, and should thus operate on
+ // the latest state
+ MutationState state = plan.execute();
+ connection.getMutationState().join(state);
+ if (connection.getAutoCommit()) {
+ connection.commit();
+ }
+ lastResultSet = null;
+ lastQueryPlan = null;
+ // Unfortunately, JDBC uses an int for update count, so we
+ // just max out at Integer.MAX_VALUE
+ long updateCount = state.getUpdateCount();
+ lastUpdateCount = (int)Math.min(Integer.MAX_VALUE, updateCount);
+ return lastUpdateCount;
+ }
+
+ private class ExecutableUpsertStatement extends UpsertStatement implements MutatableStatement {
+ private ExecutableUpsertStatement(NamedTableNode table, HintNode hintNode, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount) {
+ super(table, hintNode, columns, values, select, bindCount);
+ }
+
+ @Override
+ public PhoenixResultSet executeQuery() throws SQLException {
+ throw new ExecuteQueryNotApplicableException("upsert", this.toString());
+ }
+
+ @Override
+ public boolean execute() throws SQLException {
+ executeUpdate();
+ return false;
+ }
+
+ @Override
+ public int executeUpdate() throws SQLException {
+ lastUpdateOperation = UpdateOperation.UPSERTED;
+ return executeMutation(optimizePlan());
+ }
+
+ @Override
+ public ResultSetMetaData getResultSetMetaData() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public MutationPlan compilePlan() throws SQLException {
+ UpsertCompiler compiler = new UpsertCompiler(PhoenixStatement.this);
+ return compiler.compile(this);
+ }
+
+ @Override
+ public MutationPlan optimizePlan() throws SQLException {
+ return compilePlan();
+ }
+ }
+
+ private class ExecutableDeleteStatement extends DeleteStatement implements MutatableStatement {
+ private ExecutableDeleteStatement(NamedTableNode table, HintNode hint, ParseNode whereNode, List<OrderByNode> orderBy, LimitNode limit, int bindCount) {
+ super(table, hint, whereNode, orderBy, limit, bindCount);
+ }
+
+ @Override
+ public PhoenixResultSet executeQuery() throws SQLException {
+ throw new ExecuteQueryNotApplicableException("delete", this.toString());
+ }
+
+ @Override
+ public boolean execute() throws SQLException {
+ executeUpdate();
+ return false;
+ }
+
+ @Override
+ public int executeUpdate() throws SQLException {
+ lastUpdateOperation = UpdateOperation.DELETED;
+ return executeMutation(optimizePlan());
+ }
+
+ @Override
+ public ResultSetMetaData getResultSetMetaData() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public MutationPlan compilePlan() throws SQLException {
+ DeleteCompiler compiler = new DeleteCompiler(PhoenixStatement.this);
+ return compiler.compile(this);
+ }
+
+ @Override
+ public MutationPlan optimizePlan() throws SQLException {
+ return compilePlan();
+ }
+ }
+
+ private class ExecutableCreateTableStatement extends CreateTableStatement implements ExecutableStatement {
+ ExecutableCreateTableStatement(TableName tableName, ListMultimap<String,Pair<String,Object>> props, List<ColumnDef> columnDefs,
+ PrimaryKeyConstraint pkConstraint, List<ParseNode> splitNodes, PTableType tableType, boolean ifNotExists,
+ TableName baseTableName, ParseNode tableTypeIdNode, int bindCount) {
+ super(tableName, props, columnDefs, pkConstraint, splitNodes, tableType, ifNotExists, baseTableName, tableTypeIdNode, bindCount);
+ }
+
+ @Override
+ public PhoenixResultSet executeQuery() throws SQLException {
+ throw new ExecuteQueryNotApplicableException("CREATE TABLE", this.toString());
+ }
+
+ @Override
+ public boolean execute() throws SQLException {
+ executeUpdate();
+ return false;
+ }
+
+ @Override
+ public int executeUpdate() throws SQLException {
+ MutationPlan plan = optimizePlan();
+ MutationState state = plan.execute();
+ lastQueryPlan = null;
+ lastResultSet = null;
+ lastUpdateCount = (int)Math.min(state.getUpdateCount(), Integer.MAX_VALUE);
+ lastUpdateOperation = UpdateOperation.UPSERTED;
+ return lastUpdateCount;
+ }
+
+ @Override
+ public ResultSetMetaData getResultSetMetaData() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public MutationPlan compilePlan() throws SQLException {
+ CreateTableCompiler compiler = new CreateTableCompiler(PhoenixStatement.this);
+ return compiler.compile(this);
+ }
+
+ @Override
+ public MutationPlan optimizePlan() throws SQLException {
+ return compilePlan();
+ }
+ }
+
+ private class ExecutableCreateIndexStatement extends CreateIndexStatement implements ExecutableStatement {
+
+ public ExecutableCreateIndexStatement(NamedNode indexName, NamedTableNode dataTable, PrimaryKeyConstraint pkConstraint, List<ColumnName> includeColumns, List<ParseNode> splits,
+ ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, int bindCount) {
+ super(indexName, dataTable, pkConstraint, includeColumns, splits, props, ifNotExists, bindCount);
+ }
+
+ @Override
+ public PhoenixResultSet executeQuery() throws SQLException {
+ throw new ExecuteQueryNotApplicableException("CREATE INDEX", this.toString());
+ }
+
+ @Override
+ public boolean execute() throws SQLException {
+ executeUpdate();
+ return false;
+ }
+
+ @Override
+ public int executeUpdate() throws SQLException {
+ MutationPlan plan = optimizePlan();
+ MutationState state = plan.execute();
+ lastQueryPlan = null;
+ lastResultSet = null;
+ lastUpdateCount = (int)Math.min(state.getUpdateCount(), Integer.MAX_VALUE);
+ lastUpdateOperation = UpdateOperation.UPSERTED;
+ return lastUpdateCount;
+ }
+
+ @Override
+ public ResultSetMetaData getResultSetMetaData() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public MutationPlan compilePlan() throws SQLException {
+ CreateIndexCompiler compiler = new CreateIndexCompiler(PhoenixStatement.this);
+ return compiler.compile(this);
+ }
+
+ @Override
+ public MutationPlan optimizePlan() throws SQLException {
+ return compilePlan();
+ }
+ }
+
+ private class ExecutableCreateSequenceStatement extends CreateSequenceStatement implements ExecutableStatement {
+
+ public ExecutableCreateSequenceStatement(TableName sequenceName, ParseNode startWith, ParseNode incrementBy, ParseNode cacheSize, boolean ifNotExists, int bindCount) {
+ super(sequenceName, startWith, incrementBy, cacheSize, ifNotExists, bindCount);
+ }
+
+ @Override
+ public PhoenixResultSet executeQuery() throws SQLException {
+ throw new ExecuteQueryNotApplicableException("CREATE SEQUENCE", this.toString());
+ }
+
+ @Override
+ public boolean execute() throws SQLException {
+ executeUpdate();
+ return false;
+ }
+
+ @Override
+ public int executeUpdate() throws SQLException {
+ MutationPlan plan = optimizePlan();
+ MutationState state = plan.execute();
+ lastQueryPlan = null;
+ lastResultSet = null;
+ lastUpdateCount = (int)Math.min(state.getUpdateCount(), Integer.MAX_VALUE);
+ lastUpdateOperation = UpdateOperation.UPSERTED;
+ return lastUpdateCount;
+ }
+
+ @Override
+ public ResultSetMetaData getResultSetMetaData() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public MutationPlan compilePlan() throws SQLException {
+ CreateSequenceCompiler compiler = new CreateSequenceCompiler(PhoenixStatement.this);
+ return compiler.compile(this);
+ }
+
+ @Override
+ public MutationPlan optimizePlan() throws SQLException {
+ return compilePlan();
+ }
+ }
+
+ private class ExecutableDropSequenceStatement extends DropSequenceStatement implements ExecutableStatement {
+
+
+ public ExecutableDropSequenceStatement(TableName sequenceName, boolean ifExists, int bindCount) {
+ super(sequenceName, ifExists, bindCount);
+ }
+
+ @Override
+ public PhoenixResultSet executeQuery() throws SQLException {
+ throw new ExecuteQueryNotApplicableException("DROP SEQUENCE", this.toString());
+ }
+
+ @Override
+ public boolean execute() throws SQLException {
+ executeUpdate();
+ return false;
+ }
+
+ @Override
+ public int executeUpdate() throws SQLException {
+ MutationPlan plan = optimizePlan();
+ MutationState state = plan.execute();
+ lastQueryPlan = null;
+ lastResultSet = null;
+ lastUpdateCount = (int)Math.min(state.getUpdateCount(), Integer.MAX_VALUE);
+ lastUpdateOperation = UpdateOperation.UPSERTED;
+ return lastUpdateCount;
+ }
+
+ @Override
+ public ResultSetMetaData getResultSetMetaData() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public MutationPlan compilePlan() throws SQLException {
+ DropSequenceCompiler compiler = new DropSequenceCompiler(PhoenixStatement.this);
+ return compiler.compile(this);
+ }
+
+ @Override
+ public MutationPlan optimizePlan() throws SQLException {
+ return compilePlan();
+ }
+ }
+
+ private class ExecutableDropTableStatement extends DropTableStatement implements ExecutableStatement {
+
+ ExecutableDropTableStatement(TableName tableName, PTableType tableType, boolean ifExists) {
+ super(tableName, tableType, ifExists);
+ }
+
+ @Override
+ public PhoenixResultSet executeQuery() throws SQLException {
+ throw new ExecuteQueryNotApplicableException("DROP TABLE", this.toString());
+ }
+
+ @Override
+ public boolean execute() throws SQLException {
+ executeUpdate();
+ return false;
+ }
+
+ @Override
+ public int executeUpdate() throws SQLException {
+ MetaDataClient client = new MetaDataClient(connection);
+ MutationState state = client.dropTable(this);
+ lastQueryPlan = null;
+ lastResultSet = null;
+ lastUpdateCount = (int)Math.min(state.getUpdateCount(), Integer.MAX_VALUE);
+ lastUpdateOperation = UpdateOperation.DELETED;
+ return lastUpdateCount;
+ }
+
+ @Override
+ public ResultSetMetaData getResultSetMetaData() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public StatementPlan compilePlan() throws SQLException {
+ return new StatementPlan() {
+
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA;
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ return new ExplainPlan(Collections.singletonList("DROP TABLE"));
+ }
+ };
+ }
+
+ @Override
+ public StatementPlan optimizePlan() throws SQLException {
+ return compilePlan();
+ }
+ }
+
+ private class ExecutableDropIndexStatement extends DropIndexStatement implements ExecutableStatement {
+
+ public ExecutableDropIndexStatement(NamedNode indexName, TableName tableName, boolean ifExists) {
+ super(indexName, tableName, ifExists);
+ }
+
+ @Override
+ public PhoenixResultSet executeQuery() throws SQLException {
+ throw new ExecuteQueryNotApplicableException("DROP INDEX", this.toString());
+ }
+
+ @Override
+ public boolean execute() throws SQLException {
+ executeUpdate();
+ return false;
+ }
+
+ @Override
+ public int executeUpdate() throws SQLException {
+ MetaDataClient client = new MetaDataClient(connection);
+ MutationState state = client.dropIndex(this);
+ lastQueryPlan = null;
+ lastResultSet = null;
+ lastUpdateCount = (int)Math.min(state.getUpdateCount(), Integer.MAX_VALUE);
+ lastUpdateOperation = UpdateOperation.DELETED;
+ return lastUpdateCount;
+ }
+
+ @Override
+ public ResultSetMetaData getResultSetMetaData() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public StatementPlan compilePlan() throws SQLException {
+ return new StatementPlan() {
+
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA;
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ return new ExplainPlan(Collections.singletonList("DROP INDEX"));
+ }
+ };
+ }
+
+ @Override
+ public StatementPlan optimizePlan() throws SQLException {
+ return compilePlan();
+ }
+ }
+
+ private class ExecutableAlterIndexStatement extends AlterIndexStatement implements ExecutableStatement {
+
+ public ExecutableAlterIndexStatement(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state) {
+ super(indexTableNode, dataTableName, ifExists, state);
+ }
+
+ @Override
+ public PhoenixResultSet executeQuery() throws SQLException {
+ throw new ExecuteQueryNotApplicableException("ALTER INDEX", this.toString());
+ }
+
+ @Override
+ public boolean execute() throws SQLException {
+ executeUpdate();
+ return false;
+ }
+
+ @Override
+ public int executeUpdate() throws SQLException {
+ MetaDataClient client = new MetaDataClient(connection);
+ MutationState state = client.alterIndex(this);
+ lastQueryPlan = null;
+ lastResultSet = null;
+ lastUpdateCount = (int)Math.min(state.getUpdateCount(), Integer.MAX_VALUE);
+ lastUpdateOperation = UpdateOperation.UPSERTED;
+ return lastUpdateCount;
+ }
+
+ @Override
+ public ResultSetMetaData getResultSetMetaData() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public StatementPlan compilePlan() throws SQLException {
+ return new StatementPlan() {
+
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA;
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ return new ExplainPlan(Collections.singletonList("ALTER INDEX"));
+ }
+ };
+ }
+
+ @Override
+ public StatementPlan optimizePlan() throws SQLException {
+ return compilePlan();
+ }
+ }
+
+ private class ExecutableAddColumnStatement extends AddColumnStatement implements ExecutableStatement {
+
+ ExecutableAddColumnStatement(NamedTableNode table, PTableType tableType, List<ColumnDef> columnDefs, boolean ifNotExists, Map<String, Object> props) {
+ super(table, tableType, columnDefs, ifNotExists, props);
+ }
+
+ @Override
+ public PhoenixResultSet executeQuery() throws SQLException {
+ throw new ExecuteQueryNotApplicableException("ALTER TABLE", this.toString());
+ }
+
+ @Override
+ public boolean execute() throws SQLException {
+ executeUpdate();
+ return false;
+ }
+
+ @Override
+ public int executeUpdate() throws SQLException {
+ MetaDataClient client = new MetaDataClient(connection);
+ MutationState state = client.addColumn(this);
+ lastQueryPlan = null;
+ lastResultSet = null;
+ lastUpdateCount = (int)Math.min(state.getUpdateCount(), Integer.MAX_VALUE);
+ lastUpdateOperation = UpdateOperation.UPSERTED;
+ return lastUpdateCount;
+ }
+
+ @Override
+ public ResultSetMetaData getResultSetMetaData() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public StatementPlan compilePlan() throws SQLException {
+ return new StatementPlan() {
+
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA;
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ return new ExplainPlan(Collections.singletonList("ALTER " + getTableType() + " ADD COLUMN"));
+ }
+ };
+ }
+
+ @Override
+ public StatementPlan optimizePlan() throws SQLException {
+ return compilePlan();
+ }
+ }
+
+ private class ExecutableDropColumnStatement extends DropColumnStatement implements ExecutableStatement {
+
+ ExecutableDropColumnStatement(NamedTableNode table, PTableType tableType, List<ColumnName> columnRefs, boolean ifExists) {
+ super(table, tableType, columnRefs, ifExists);
+ }
+
+ @Override
+ public PhoenixResultSet executeQuery() throws SQLException {
+ throw new ExecuteQueryNotApplicableException("ALTER TABLE", this.toString());
+ }
+
+ @Override
+ public boolean execute() throws SQLException {
+ executeUpdate();
+ return false;
+ }
+
+ @Override
+ public int executeUpdate() throws SQLException {
+ MetaDataClient client = new MetaDataClient(connection);
+ MutationState state = client.dropColumn(this);
+ lastQueryPlan = null;
+ lastResultSet = null;
+ lastUpdateCount = (int)Math.min(state.getUpdateCount(), Integer.MAX_VALUE);
+ lastUpdateOperation = UpdateOperation.UPSERTED;
+ return lastUpdateCount;
+ }
+
+ @Override
+ public ResultSetMetaData getResultSetMetaData() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public StatementPlan compilePlan() throws SQLException {
+ return new StatementPlan() {
+
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return new PhoenixParameterMetaData(0);
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ return new ExplainPlan(Collections.singletonList("ALTER " + getTableType() + " DROP COLUMN"));
+ }
+ };
+ }
+
+ @Override
+ public StatementPlan optimizePlan() throws SQLException {
+ return compilePlan();
+ }
+ }
+
+ private static final byte[] EXPLAIN_PLAN_FAMILY = QueryConstants.SINGLE_COLUMN_FAMILY;
+ private static final byte[] EXPLAIN_PLAN_COLUMN = PDataType.VARCHAR.toBytes("Plan");
+ private static final String EXPLAIN_PLAN_ALIAS = "PLAN";
+ private static final String EXPLAIN_PLAN_TABLE_NAME = "PLAN_TABLE";
+ private static final PDatum EXPLAIN_PLAN_DATUM = new PDatum() {
+ @Override
+ public boolean isNullable() {
+ return false;
+ }
+ @Override
+ public PDataType getDataType() {
+ return PDataType.VARCHAR;
+ }
+ @Override
+ public Integer getByteSize() {
+ return null;
+ }
+ @Override
+ public Integer getMaxLength() {
+ return null;
+ }
+ @Override
+ public Integer getScale() {
+ return null;
+ }
+ @Override
+ public ColumnModifier getColumnModifier() {
+ return null;
+ }
+ };
+ private static final RowProjector EXPLAIN_PLAN_ROW_PROJECTOR = new RowProjector(Arrays.<ColumnProjector>asList(
+ new ExpressionProjector(EXPLAIN_PLAN_ALIAS, EXPLAIN_PLAN_TABLE_NAME,
+ new RowKeyColumnExpression(EXPLAIN_PLAN_DATUM,
+ new RowKeyValueAccessor(Collections.<PDatum>singletonList(EXPLAIN_PLAN_DATUM), 0)), false)
+ ), 0, true);
+ private class ExecutableExplainStatement extends ExplainStatement implements ExecutableStatement {
+
+ public ExecutableExplainStatement(BindableStatement statement) {
+ super(statement);
+ }
+
+ @Override
+ public ExecutableStatement getStatement() {
+ return (ExecutableStatement) super.getStatement();
+ }
+
+ @Override
+ public int getBindCount() {
+ return getStatement().getBindCount();
+ }
+
+ @Override
+ public PhoenixResultSet executeQuery() throws SQLException {
+ StatementPlan plan = getStatement().optimizePlan();
+ List<String> planSteps = plan.getExplainPlan().getPlanSteps();
+ List<Tuple> tuples = Lists.newArrayListWithExpectedSize(planSteps.size());
+ for (String planStep : planSteps) {
+ Tuple tuple = new SingleKeyValueTuple(KeyValueUtil.newKeyValue(PDataType.VARCHAR.toBytes(planStep), EXPLAIN_PLAN_FAMILY, EXPLAIN_PLAN_COLUMN, MetaDataProtocol.MIN_TABLE_TIMESTAMP, ByteUtil.EMPTY_BYTE_ARRAY));
+ tuples.add(tuple);
+ }
+ PhoenixResultSet rs = new PhoenixResultSet(new MaterializedResultIterator(tuples),EXPLAIN_PLAN_ROW_PROJECTOR, new PhoenixStatement(connection));
+ lastResultSet = rs;
+ lastQueryPlan = null;
+ lastUpdateCount = NO_UPDATE;
+ return rs;
+ }
+
+ @Override
+ public boolean execute() throws SQLException {
+ executeQuery();
+ return true;
+ }
+
+ @Override
+ public int executeUpdate() throws SQLException {
+ throw new ExecuteUpdateNotApplicableException("ALTER TABLE", this.toString());
+ }
+
+ @Override
+ public ResultSetMetaData getResultSetMetaData() throws SQLException {
+ return new PhoenixResultSetMetaData(connection, EXPLAIN_PLAN_ROW_PROJECTOR);
+ }
+
+ @Override
+ public StatementPlan compilePlan() throws SQLException {
+ return StatementPlan.EMPTY_PLAN;
+ }
+
+ @Override
+ public StatementPlan optimizePlan() throws SQLException {
+ return compilePlan();
+ }
+ }
+
+ protected class ExecutableNodeFactory extends ParseNodeFactory {
+ @Override
+ public ExecutableSelectStatement select(List<? extends TableNode> from, HintNode hint, boolean isDistinct, List<AliasedNode> select,
+ ParseNode where, List<ParseNode> groupBy, ParseNode having,
+ List<OrderByNode> orderBy, LimitNode limit, int bindCount, boolean isAggregate) {
+ return new ExecutableSelectStatement(from, hint, isDistinct, select, where, groupBy == null ? Collections.<ParseNode>emptyList() : groupBy, having, orderBy == null ? Collections.<OrderByNode>emptyList() : orderBy, limit, bindCount, isAggregate);
+ }
+
+ @Override
+ public ExecutableUpsertStatement upsert(NamedTableNode table, HintNode hintNode, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount) {
+ return new ExecutableUpsertStatement(table, hintNode, columns, values, select, bindCount);
+ }
+
+ @Override
+ public ExecutableDeleteStatement delete(NamedTableNode table, HintNode hint, ParseNode whereNode, List<OrderByNode> orderBy, LimitNode limit, int bindCount) {
+ return new ExecutableDeleteStatement(table, hint, whereNode, orderBy, limit, bindCount);
+ }
+
+ @Override
+ public CreateTableStatement createTable(TableName tableName, ListMultimap<String,Pair<String,Object>> props, List<ColumnDef> columns, PrimaryKeyConstraint pkConstraint,
+ List<ParseNode> splits, PTableType tableType, boolean ifNotExists, TableName baseTableName, ParseNode tableTypeIdNode, int bindCount) {
+ return new ExecutableCreateTableStatement(tableName, props, columns, pkConstraint, splits, tableType, ifNotExists, baseTableName, tableTypeIdNode, bindCount);
+ }
+
+ @Override
+ public CreateSequenceStatement createSequence(TableName tableName, ParseNode startsWith, ParseNode incrementBy, ParseNode cacheSize, boolean ifNotExists, int bindCount){
+ return new ExecutableCreateSequenceStatement(tableName, startsWith, incrementBy, cacheSize, ifNotExists, bindCount);
+ }
+
+ @Override
+ public DropSequenceStatement dropSequence(TableName tableName, boolean ifExists, int bindCount){
+ return new ExecutableDropSequenceStatement(tableName, ifExists, bindCount);
+ }
+
+ @Override
+ public CreateIndexStatement createIndex(NamedNode indexName, NamedTableNode dataTable, PrimaryKeyConstraint pkConstraint, List<ColumnName> includeColumns, List<ParseNode> splits, ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, int bindCount) {
+ return new ExecutableCreateIndexStatement(indexName, dataTable, pkConstraint, includeColumns, splits, props, ifNotExists, bindCount);
+ }
+
+ @Override
+ public AddColumnStatement addColumn(NamedTableNode table, PTableType tableType, List<ColumnDef> columnDefs, boolean ifNotExists, Map<String,Object> props) {
+ return new ExecutableAddColumnStatement(table, tableType, columnDefs, ifNotExists, props);
+ }
+
+ @Override
+ public DropColumnStatement dropColumn(NamedTableNode table, PTableType tableType, List<ColumnName> columnNodes, boolean ifExists) {
+ return new ExecutableDropColumnStatement(table, tableType, columnNodes, ifExists);
+ }
+
+ @Override
+ public DropTableStatement dropTable(TableName tableName, PTableType tableType, boolean ifExists) {
+ return new ExecutableDropTableStatement(tableName, tableType, ifExists);
+ }
+
+ @Override
+ public DropIndexStatement dropIndex(NamedNode indexName, TableName tableName, boolean ifExists) {
+ return new ExecutableDropIndexStatement(indexName, tableName, ifExists);
+ }
+
+ @Override
+ public AlterIndexStatement alterIndex(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state) {
+ return new ExecutableAlterIndexStatement(indexTableNode, dataTableName, ifExists, state);
+ }
+
+ @Override
+ public ExplainStatement explain(BindableStatement statement) {
+ return new ExecutableExplainStatement(statement);
+ }
+ }
+
+ static class PhoenixStatementParser extends SQLParser {
+ PhoenixStatementParser(String query, ParseNodeFactory nodeFactory) throws IOException {
+ super(query, nodeFactory);
+ }
+
+ PhoenixStatementParser(Reader reader) throws IOException {
+ super(reader);
+ }
+
+ @Override
+ public ExecutableStatement nextStatement(ParseNodeFactory nodeFactory) throws SQLException {
+ return (ExecutableStatement) super.nextStatement(nodeFactory);
+ }
+
+ @Override
+ public ExecutableStatement parseStatement() throws SQLException {
+ return (ExecutableStatement) super.parseStatement();
+ }
+ }
+
+ public Format getFormatter(PDataType type) {
+ return connection.getFormatter(type);
+ }
+
+ @Override
+ public void addBatch(String sql) throws SQLException {
+ throw new SQLFeatureNotSupportedException();
+ }
+
+ @Override
+ public void cancel() throws SQLException {
+ throw new SQLFeatureNotSupportedException();
+ }
+
+ @Override
+ public void clearBatch() throws SQLException {
+ throw new SQLFeatureNotSupportedException();
+ }
+
+ @Override
+ public void clearWarnings() throws SQLException {
+ }
+
+ @Override
+ public void close() throws SQLException {
+ try {
+ List<PhoenixResultSet> resultSets = this.resultSets;
+ // Create new list so that remove of the PhoenixResultSet
+ // during closeAll doesn't needless do a linear search
+ // on this list.
+ this.resultSets = Lists.newArrayList();
+ SQLCloseables.closeAll(resultSets);
+ } finally {
+ try {
+ connection.removeStatement(this);
+ } finally {
+ isClosed = true;
+ }
+ }
+ }
+
+ public List<Object> getParameters() {
+ return Collections.<Object>emptyList();
+ }
+
+ protected ExecutableStatement parseStatement(String sql) throws SQLException {
+ PhoenixStatementParser parser = null;
+ try {
+ parser = new PhoenixStatementParser(sql, new ExecutableNodeFactory());
+ } catch (IOException e) {
+ throw ServerUtil.parseServerException(e);
+ }
+ ExecutableStatement statement = parser.parseStatement();
+ return statement;
+ }
+
+ @Override
+ public boolean execute(String sql) throws SQLException {
+ return parseStatement(sql).execute();
+ }
+
+ public QueryPlan optimizeQuery(String sql) throws SQLException {
+ return (QueryPlan)parseStatement(sql).optimizePlan();
+ }
+
+ public QueryPlan compileQuery(String sql) throws SQLException {
+ return (QueryPlan)parseStatement(sql).compilePlan();
+ }
+
+ @Override
+ public ResultSet executeQuery(String sql) throws SQLException {
+ return parseStatement(sql).executeQuery();
+ }
+
+ @Override
+ public int executeUpdate(String sql) throws SQLException {
+ return parseStatement(sql).executeUpdate();
+ }
+
+ @Override
+ public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
+ throw new SQLFeatureNotSupportedException();
+ }
+
+ @Override
+ public boolean execute(String sql, int[] columnIndexes) throws SQLException {
+ throw new SQLFeatureNotSupportedException();
+ }
+
+ @Override
+ public boolean execute(String sql, String[] columnNames) throws SQLException {
+ throw new SQLFeatureNotSupportedException();
+ }
+
+ @Override
+ public int[] executeBatch() throws SQLException {
+ throw new SQLFeatureNotSupportedException();
+ }
+
+ @Override
+ public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
+ throw new SQLFeatureNotSupportedException();
+ }
+
+ @Override
+ public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
+ throw new SQLFeatureNotSupportedException();
+ }
+
+ @Override
+ public int executeUpdate(String sql, String[] columnNames) throws SQLException {
+ throw new SQLFeatureNotSupportedException();
+ }
+
+ @Override
+ public PhoenixConnection getConnection() {
+ return connection;
+ }
+
+ @Override
+ public int getFetchDirection() throws SQLException {
+ return ResultSet.FETCH_FORWARD;
+ }
+
+ @Override
+ public int getFetchSize() throws SQLException {
+ return connection.getQueryServices().getProps().getInt(QueryServices.SCAN_CACHE_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_SCAN_CACHE_SIZE);
+ }
+
+ @Override
+ public ResultSet getGeneratedKeys() throws SQLException {
+ throw new SQLFeatureNotSupportedException();
+ }
+
+ @Override
+ public int getMaxFieldSize() throws SQLException {
+ return 0; // TODO: 4000?
+ }
+
+ @Override
+ public int getMaxRows() throws SQLException {
+ return maxRows;
+ }
+
+ @Override
+ public boolean getMoreResults() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public boolean getMoreResults(int current) throws SQLException {
+ return false;
+ }
+
+ @Override
+ public int getQueryTimeout() throws SQLException {
+ return connection.getQueryServices().getProps().getInt(QueryServices.KEEP_ALIVE_MS_ATTRIB, 0) / 1000;
+ }
+
+ // For testing
+ public QueryPlan getQueryPlan() {
+ return lastQueryPlan;
+ }
+
+ @Override
+ public ResultSet getResultSet() throws SQLException {
+ ResultSet rs = lastResultSet;
+ lastResultSet = null;
+ return rs;
+ }
+
+ @Override
+ public int getResultSetConcurrency() throws SQLException {
+ return ResultSet.CONCUR_READ_ONLY;
+ }
+
+ @Override
+ public int getResultSetHoldability() throws SQLException {
+ // TODO: not sure this matters
+ return ResultSet.CLOSE_CURSORS_AT_COMMIT;
+ }
+
+ @Override
+ public int getResultSetType() throws SQLException {
+ return ResultSet.TYPE_FORWARD_ONLY;
+ }
+
+ public UpdateOperation getUpdateOperation() {
+ return lastUpdateOperation;
+ }
+
+ @Override
+ public int getUpdateCount() throws SQLException {
+ int updateCount = lastUpdateCount;
+ // Only first call can get the update count, otherwise
+ // some SQL clients get into an infinite loop when an
+ // update occurs.
+ lastUpdateCount = NO_UPDATE;
+ return updateCount;
+ }
+
+ @Override
+ public SQLWarning getWarnings() throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean isClosed() throws SQLException {
+ return isClosed;
+ }
+
+ @Override
+ public boolean isPoolable() throws SQLException {
+ return false;
+ }
+
+ @Override
+ public void setCursorName(String name) throws SQLException {
+ throw new SQLFeatureNotSupportedException();
+ }
+
+ @Override
+ public void setEscapeProcessing(boolean enable) throws SQLException {
+ // TODO: any escaping we need to do?
+ }
+
+ @Override
+ public void setFetchDirection(int direction) throws SQLException {
+ if (direction != ResultSet.FETCH_FORWARD) {
+ throw new SQLFeatureNotSupportedException();
+ }
+ }
+
+ @Override
+ public void setFetchSize(int rows) throws SQLException {
+ // TODO: map to Scan.setBatch() ?
+ throw new SQLFeatureNotSupportedException();
+ }
+
+ @Override
+ public void setMaxFieldSize(int max) throws SQLException {
+ throw new SQLFeatureNotSupportedException();
+ }
+
+ @Override
+ public void setMaxRows(int max) throws SQLException {
+ this.maxRows = max;
+ }
+
+ @Override
+ public void setPoolable(boolean poolable) throws SQLException {
+ if (poolable) {
+ throw new SQLFeatureNotSupportedException();
+ }
+ }
+
+ @Override
+ public void setQueryTimeout(int seconds) throws SQLException {
+ // The Phoenix setting for this is shared across all connections currently
+ throw new SQLFeatureNotSupportedException();
+ }
+
+ @Override
+ public boolean isWrapperFor(Class<?> iface) throws SQLException {
+ return iface.isInstance(this);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> T unwrap(Class<T> iface) throws SQLException {
+ if (!iface.isInstance(this)) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.CLASS_NOT_UNWRAPPABLE)
+ .setMessage(this.getClass().getName() + " not unwrappable from " + iface.getName())
+ .build().buildException();
+ }
+ return (T)this;
+ }
+
+ @Override
+ public void closeOnCompletion() throws SQLException {
+ throw new SQLFeatureNotSupportedException();
+ }
+
+ @Override
+ public boolean isCloseOnCompletion() throws SQLException {
+ throw new SQLFeatureNotSupportedException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatementFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatementFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatementFactory.java
new file mode 100644
index 0000000..5081049
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatementFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.jdbc;
+
+public interface PhoenixStatementFactory {
+ public PhoenixStatement newStatement(PhoenixConnection connection);
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/job/AbstractRoundRobinQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/job/AbstractRoundRobinQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/job/AbstractRoundRobinQueue.java
new file mode 100644
index 0000000..e77e4ca
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/job/AbstractRoundRobinQueue.java
@@ -0,0 +1,314 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.job;
+
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ * An bounded blocking queue implementation that keeps a virtual queue of elements on per-producer
+ * basis and iterates through each producer queue in round robin fashion.
+ *
+ */
+public abstract class AbstractRoundRobinQueue<E> extends AbstractQueue<E>
+ implements BlockingQueue<E>{
+
+ /**
+ * Construct an AbstractBlockingRoundRobinQueue that limits the size of the queued elements
+ * to at most maxSize. Attempts to insert new elements after that point will cause the
+ * caller to block.
+ * @param maxSize
+ */
+ public AbstractRoundRobinQueue(int maxSize) {
+ this(maxSize, false);
+ }
+ /**
+ * @param newProducerToFront If true, new producers go to the front of the round-robin list, if false, they go to the end.
+ */
+ public AbstractRoundRobinQueue(int maxSize, boolean newProducerToFront) {
+ this.producerMap = new HashMap<Object,ProducerList<E>>();
+ this.producerLists = new LinkedList<ProducerList<E>>();
+ this.lock = new Object();
+ this.newProducerToFront = newProducerToFront;
+ this.maxSize = maxSize;
+ }
+
+ @Override
+ public Iterator<E> iterator() {
+ synchronized(lock) {
+ ArrayList<E> allElements = new ArrayList<E>(this.size);
+ ListIterator<ProducerList<E>> iter = this.producerLists.listIterator(this.currentProducer);
+ while(iter.hasNext()) {
+ ProducerList<E> tList = iter.next();
+ allElements.addAll(tList.list);
+ }
+ return allElements.iterator();
+ }
+ }
+
+ @Override
+ public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
+ boolean taken = false;
+ long endAt = System.currentTimeMillis() + unit.toMillis(timeout);
+ synchronized(lock) {
+ long waitTime = endAt - System.currentTimeMillis();
+ while (!(taken = offer(o)) && waitTime > 0) {
+ this.lock.wait(waitTime);
+ waitTime = endAt - System.currentTimeMillis();
+ }
+ }
+ return taken;
+ }
+
+ @Override
+ public boolean offer(E o) {
+ if (o == null)
+ throw new NullPointerException();
+
+ final Object producerKey = extractProducer(o);
+
+ ProducerList<E> producerList = null;
+ synchronized(lock) {
+ if (this.size == this.maxSize) {
+ return false;
+ }
+ producerList = this.producerMap.get(producerKey);
+ if (producerList == null) {
+ producerList = new ProducerList<E>(producerKey);
+ this.producerMap.put(producerKey, producerList);
+ this.producerLists.add(this.currentProducer, producerList);
+ if (!this.newProducerToFront) {
+ incrementCurrentProducerPointer();
+ }
+ }
+ producerList.list.add(o);
+ this.size++;
+ lock.notifyAll();
+ }
+ return true;
+ }
+
+ /**
+ * Implementations must extracts the producer object which is used as the key to identify a unique producer.
+ */
+ protected abstract Object extractProducer(E o);
+
+ @Override
+ public void put(E o) {
+ offer(o);
+ }
+
+ @Override
+ public E take() throws InterruptedException {
+ synchronized(lock) {
+ while (this.size == 0) {
+ this.lock.wait();
+ }
+ E element = poll();
+ assert element != null;
+ return element;
+ }
+ }
+
+ @Override
+ public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+ long endAt = System.currentTimeMillis() + unit.toMillis(timeout);
+ synchronized(lock) {
+ long waitTime = endAt - System.currentTimeMillis();
+ while (this.size == 0 && waitTime > 0) {
+ this.lock.wait(waitTime);
+ waitTime = endAt - System.currentTimeMillis();
+ }
+ return poll();
+ }
+ }
+
+ @Override
+ public E poll() {
+ synchronized(lock) {
+ ListIterator<ProducerList<E>> iter = this.producerLists.listIterator(this.currentProducer);
+ while (iter.hasNext()) {
+ ProducerList<E> tList = iter.next();
+ if (tList.list.isEmpty()) {
+ iter.remove();
+ this.producerMap.remove(tList.producer);
+ adjustCurrentProducerPointer();
+ } else {
+ E element = tList.list.removeFirst();
+ this.size--;
+ assert element != null;
+ // This is the round robin part. When we take an element from the current thread's queue
+ // we move on to the next thread.
+ if (tList.list.isEmpty()) {
+ iter.remove();
+ this.producerMap.remove(tList.producer);
+ adjustCurrentProducerPointer();
+ } else {
+ incrementCurrentProducerPointer();
+ }
+ lock.notifyAll();
+ return element;
+ }
+ }
+ assert this.size == 0;
+ }
+ return null;
+ }
+
+ /**
+ * Polls using the given producer key.
+ */
+ protected E pollProducer(Object producer) {
+ synchronized(lock) {
+ ProducerList<E> tList = this.producerMap.get(producer);
+ if (tList != null && !tList.list.isEmpty()) {
+ E element = tList.list.removeFirst();
+ this.size--;
+ if (tList.list.isEmpty()) {
+ this.producerLists.remove(tList);
+ this.producerMap.remove(tList.producer);
+ // we need to adjust the current thread pointer in case it pointed to this thread list, which is now removed
+ adjustCurrentProducerPointer();
+ }
+ lock.notifyAll();
+ assert element != null;
+ // Since this is only processing the current thread's work, we'll leave the
+ // round-robin part alone and just return the work
+ return element;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public E peek() {
+ synchronized(lock) {
+ ListIterator<ProducerList<E>> iter = this.producerLists.listIterator(this.currentProducer);
+ while (iter.hasNext()) {
+ ProducerList<E> tList = iter.next();
+ if (tList.list.isEmpty()) {
+ iter.remove();
+ this.producerMap.remove(tList.producer);
+ adjustCurrentProducerPointer();
+ } else {
+ E element = tList.list.getFirst();
+ assert element != null;
+ return element;
+ }
+ }
+ assert this.size == 0;
+ }
+ return null;
+ }
+
+ @Override
+ public int drainTo(Collection<? super E> c) {
+ if (c == null)
+ throw new NullPointerException();
+ if (c == this)
+ throw new IllegalArgumentException();
+
+ synchronized(this.lock) {
+ int originalSize = this.size;
+ int drained = drainTo(c, this.size);
+ assert drained == originalSize;
+ assert this.size == 0;
+ assert this.producerLists.isEmpty();
+ assert this.producerMap.isEmpty();
+ return drained;
+ }
+ }
+
+ @Override
+ public int drainTo(Collection<? super E> c, int maxElements) {
+ if (c == null)
+ throw new NullPointerException();
+ if (c == this)
+ throw new IllegalArgumentException();
+
+ synchronized(this.lock) {
+ int i = 0;
+ while(i < maxElements) {
+ E element = poll();
+ if (element != null) {
+ c.add(element);
+ i++;
+ } else {
+ break;
+ }
+ }
+ return i;
+ }
+ }
+
+ @Override
+ public int remainingCapacity() {
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public int size() {
+ synchronized(this.lock) {
+ return this.size;
+ }
+ }
+
+ private void incrementCurrentProducerPointer() {
+ synchronized(lock) {
+ if (this.producerLists.size() == 0) {
+ this.currentProducer = 0;
+ } else {
+ this.currentProducer = (this.currentProducer+1)%this.producerLists.size();
+ }
+ }
+ }
+
+ /**
+ * Adjusts the current pointer to a decrease in size.
+ */
+ private void adjustCurrentProducerPointer() {
+ synchronized(lock) {
+ if (this.producerLists.size() == 0) {
+ this.currentProducer = 0;
+ } else {
+ this.currentProducer = (this.currentProducer)%this.producerLists.size();
+ }
+ }
+ }
+
+ private static class ProducerList<E> {
+ public ProducerList(Object producer) {
+ this.producer = producer;
+ this.list = new LinkedList<E>();
+ }
+ private final Object producer;
+ private final LinkedList<E> list;
+ }
+
+ private final Map<Object,ProducerList<E>> producerMap;
+ private final LinkedList<ProducerList<E>> producerLists;
+ private final Object lock;
+ private final boolean newProducerToFront;
+ private int currentProducer;
+ private int size;
+ private int maxSize;
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java b/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java
new file mode 100644
index 0000000..7d17a6d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.job;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ *
+ * Thread pool executor that executes scans in parallel
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+@SuppressWarnings("rawtypes")
+public class JobManager<T> extends AbstractRoundRobinQueue<T> {
+
+ private static final AtomicLong PHOENIX_POOL_INDEX = new AtomicLong(1);
+
+ public JobManager(int maxSize) {
+ super(maxSize, true); // true -> new producers move to front of queue; this reduces latency.
+ }
+
+ @Override
+ protected Object extractProducer(T o) {
+ return ((JobFutureTask)o).getJobId();
+ }
+
+ public static interface JobRunnable<T> extends Runnable {
+ public Object getJobId();
+ }
+
+ public static ThreadPoolExecutor createThreadPoolExec(int keepAliveMs, int size, int queueSize) {
+ BlockingQueue<Runnable> queue;
+ if (queueSize == 0) {
+ queue = new SynchronousQueue<Runnable>(); // Specialized for 0 length.
+ } else {
+ queue = new JobManager<Runnable>(queueSize);
+ }
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(
+ "phoenix-" + PHOENIX_POOL_INDEX.getAndIncrement()
+ + "-thread-%s").build();
+ // For thread pool, set core threads = max threads -- we don't ever want to exceed core threads, but want to go up to core threads *before* using the queue.
+ ThreadPoolExecutor exec = new ThreadPoolExecutor(size, size, keepAliveMs, TimeUnit.MILLISECONDS, queue, threadFactory) {
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(Callable<T> call) {
+ // Override this so we can create a JobFutureTask so we can extract out the parentJobId (otherwise, in the default FutureTask, it is private).
+ return new JobFutureTask<T>(call);
+ }
+
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+ return new JobFutureTask<T>((JobRunnable)runnable, value);
+ }
+
+ };
+
+ exec.allowCoreThreadTimeOut(true); // ... and allow core threads to time out. This just keeps things clean when idle, and is nice for ftests modes, etc., where we'd especially like these not to linger.
+ return exec;
+ }
+
+ /**
+ * Subclasses FutureTask for the sole purpose of providing {@link #getCallable()}, which is used to extract the producer in the {@link JobBasedRoundRobinQueue}
+ */
+ static class JobFutureTask<T> extends FutureTask<T> {
+ private final Object jobId;
+
+ public JobFutureTask(JobRunnable r, T t) {
+ super(r, t);
+ this.jobId = r.getJobId();
+ }
+
+ public JobFutureTask(Callable<T> c) {
+ super(c);
+ // FIXME: this fails when executor used by hbase
+ if (c instanceof JobCallable) {
+ this.jobId = ((JobCallable<T>) c).getJobId();
+ } else {
+ this.jobId = this;
+ }
+ }
+
+ public Object getJobId() {
+ return jobId;
+ }
+ }
+
+
+ /**
+ * Delegating callable implementation that preserves the parentJobId and sets up thread tracker stuff before delegating to the actual command.
+ */
+ public static interface JobCallable<T> extends Callable<T> {
+ public Object getJobId();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
new file mode 100644
index 0000000..c374618
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.join;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+import org.xerial.snappy.Snappy;
+
+import org.apache.phoenix.cache.ServerCacheClient;
+import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ *
+ * Client for adding cache of one side of a join to region servers
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class HashCacheClient {
+ private final ServerCacheClient serverCache;
+ /**
+ * Construct client used to create a serialized cached snapshot of a table and send it to each region server
+ * for caching during hash join processing.
+ * @param connection the client connection
+ */
+ public HashCacheClient(PhoenixConnection connection) {
+ serverCache = new ServerCacheClient(connection);
+ }
+
+ /**
+ * Send the results of scanning through the scanner to all
+ * region servers for regions of the table that will use the cache
+ * that intersect with the minMaxKeyRange.
+ * @param scanner scanner for the table or intermediate results being cached
+ * @return client-side {@link ServerCache} representing the added hash cache
+ * @throws SQLException
+ * @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed
+ * size
+ */
+ public ServerCache addHashCache(ScanRanges keyRanges, ResultIterator iterator, long estimatedSize, List<Expression> onExpressions, TableRef cacheUsingTableRef) throws SQLException {
+ /**
+ * Serialize and compress hashCacheTable
+ */
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ serialize(ptr, iterator, estimatedSize, onExpressions);
+ return serverCache.addServerCache(keyRanges, ptr, new HashCacheFactory(), cacheUsingTableRef);
+ }
+
+ private void serialize(ImmutableBytesWritable ptr, ResultIterator iterator, long estimatedSize, List<Expression> onExpressions) throws SQLException {
+ long maxSize = serverCache.getConnection().getQueryServices().getProps().getLong(QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_SIZE);
+ estimatedSize = Math.min(estimatedSize, maxSize);
+ if (estimatedSize > Integer.MAX_VALUE) {
+ throw new IllegalStateException("Estimated size(" + estimatedSize + ") must not be greater than Integer.MAX_VALUE(" + Integer.MAX_VALUE + ")");
+ }
+ try {
+ TrustedByteArrayOutputStream baOut = new TrustedByteArrayOutputStream((int)estimatedSize);
+ DataOutputStream out = new DataOutputStream(baOut);
+ // Write onExpressions first, for hash key evaluation along with deserialization
+ out.writeInt(onExpressions.size());
+ for (Expression expression : onExpressions) {
+ WritableUtils.writeVInt(out, ExpressionType.valueOf(expression).ordinal());
+ expression.write(out);
+ }
+ int exprSize = baOut.size() + Bytes.SIZEOF_INT;
+ out.writeInt(exprSize);
+ int nRows = 0;
+ out.writeInt(nRows); // In the end will be replaced with total number of rows
+ for (Tuple result = iterator.next(); result != null; result = iterator.next()) {
+ TupleUtil.write(result, out);
+ if (baOut.size() > maxSize) {
+ throw new MaxServerCacheSizeExceededException("Size of hash cache (" + baOut.size() + " bytes) exceeds the maximum allowed size (" + maxSize + " bytes)");
+ }
+ nRows++;
+ }
+ TrustedByteArrayOutputStream sizeOut = new TrustedByteArrayOutputStream(Bytes.SIZEOF_INT);
+ DataOutputStream dataOut = new DataOutputStream(sizeOut);
+ try {
+ dataOut.writeInt(nRows);
+ dataOut.flush();
+ byte[] cache = baOut.getBuffer();
+ // Replace number of rows written above with the correct value.
+ System.arraycopy(sizeOut.getBuffer(), 0, cache, exprSize, sizeOut.size());
+ // Reallocate to actual size plus compressed buffer size (which is allocated below)
+ int maxCompressedSize = Snappy.maxCompressedLength(baOut.size());
+ byte[] compressed = new byte[maxCompressedSize]; // size for worst case
+ int compressedSize = Snappy.compress(baOut.getBuffer(), 0, baOut.size(), compressed, 0);
+ // Last realloc to size of compressed buffer.
+ ptr.set(compressed,0,compressedSize);
+ } finally {
+ dataOut.close();
+ }
+ } catch (IOException e) {
+ throw ServerUtil.parseServerException(e);
+ } finally {
+ iterator.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
new file mode 100644
index 0000000..6c3f69b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.join;
+
+import java.io.*;
+import java.sql.SQLException;
+import java.util.*;
+
+import net.jcip.annotations.Immutable;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+import org.xerial.snappy.Snappy;
+
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.cache.HashCache;
+import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.*;
+
+public class HashCacheFactory implements ServerCacheFactory {
+
+ public HashCacheFactory() {
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ }
+
+ @Override
+ public Closeable newCache(ImmutableBytesWritable cachePtr, MemoryChunk chunk) throws SQLException {
+ try {
+ int size = Snappy.uncompressedLength(cachePtr.get());
+ byte[] uncompressed = new byte[size];
+ Snappy.uncompress(cachePtr.get(), 0, cachePtr.getLength(), uncompressed, 0);
+ return new HashCacheImpl(uncompressed, chunk);
+ } catch (IOException e) {
+ throw ServerUtil.parseServerException(e);
+ }
+ }
+
+ @Immutable
+ private class HashCacheImpl implements HashCache {
+ private final Map<ImmutableBytesPtr,List<Tuple>> hashCache;
+ private final MemoryChunk memoryChunk;
+
+ private HashCacheImpl(byte[] hashCacheBytes, MemoryChunk memoryChunk) {
+ try {
+ this.memoryChunk = memoryChunk;
+ byte[] hashCacheByteArray = hashCacheBytes;
+ int offset = 0;
+ ByteArrayInputStream input = new ByteArrayInputStream(hashCacheByteArray, offset, hashCacheBytes.length);
+ DataInputStream dataInput = new DataInputStream(input);
+ int nExprs = dataInput.readInt();
+ List<Expression> onExpressions = new ArrayList<Expression>(nExprs);
+ for (int i = 0; i < nExprs; i++) {
+ int expressionOrdinal = WritableUtils.readVInt(dataInput);
+ Expression expression = ExpressionType.values()[expressionOrdinal].newInstance();
+ expression.readFields(dataInput);
+ onExpressions.add(expression);
+ }
+ int exprSize = dataInput.readInt();
+ offset += exprSize;
+ int nRows = dataInput.readInt();
+ int estimatedSize = SizedUtil.sizeOfMap(nRows, SizedUtil.IMMUTABLE_BYTES_WRITABLE_SIZE, SizedUtil.RESULT_SIZE) + hashCacheBytes.length;
+ this.memoryChunk.resize(estimatedSize);
+ HashMap<ImmutableBytesPtr,List<Tuple>> hashCacheMap = new HashMap<ImmutableBytesPtr,List<Tuple>>(nRows * 5 / 4);
+ offset += Bytes.SIZEOF_INT;
+ // Build Map with evaluated hash key as key and row as value
+ for (int i = 0; i < nRows; i++) {
+ int resultSize = (int)Bytes.readVLong(hashCacheByteArray, offset);
+ offset += WritableUtils.decodeVIntSize(hashCacheByteArray[offset]);
+ ImmutableBytesWritable value = new ImmutableBytesWritable(hashCacheByteArray,offset,resultSize);
+ Tuple result = new ResultTuple(new Result(value));
+ ImmutableBytesPtr key = TupleUtil.getConcatenatedValue(result, onExpressions);
+ List<Tuple> tuples = hashCacheMap.get(key);
+ if (tuples == null) {
+ tuples = new ArrayList<Tuple>(1);
+ hashCacheMap.put(key, tuples);
+ }
+ tuples.add(result);
+ offset += resultSize;
+ }
+ this.hashCache = Collections.unmodifiableMap(hashCacheMap);
+ } catch (IOException e) { // Not possible with ByteArrayInputStream
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ memoryChunk.close();
+ }
+
+ @Override
+ public List<Tuple> get(ImmutableBytesPtr hashKey) {
+ return hashCache.get(hashKey);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
new file mode 100644
index 0000000..efeb717
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
@@ -0,0 +1,210 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.join;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.io.WritableUtils;
+
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+import org.apache.phoenix.parse.JoinTableNode.JoinType;
+import org.apache.phoenix.schema.KeyValueSchema;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
+import org.apache.phoenix.util.SchemaUtil;
+
+public class HashJoinInfo {
+ private static final String HASH_JOIN = "HashJoin";
+
+ private KeyValueSchema joinedSchema;
+ private ImmutableBytesPtr[] joinIds;
+ private List<Expression>[] joinExpressions;
+ private JoinType[] joinTypes;
+ private boolean[] earlyEvaluation;
+ private KeyValueSchema[] schemas;
+ private int[] fieldPositions;
+ private Expression postJoinFilterExpression;
+
+ public HashJoinInfo(PTable joinedTable, ImmutableBytesPtr[] joinIds, List<Expression>[] joinExpressions, JoinType[] joinTypes, boolean[] earlyEvaluation, PTable[] tables, int[] fieldPositions, Expression postJoinFilterExpression) {
+ this(buildSchema(joinedTable), joinIds, joinExpressions, joinTypes, earlyEvaluation, buildSchemas(tables), fieldPositions, postJoinFilterExpression);
+ }
+
+ private static KeyValueSchema[] buildSchemas(PTable[] tables) {
+ KeyValueSchema[] schemas = new KeyValueSchema[tables.length];
+ for (int i = 0; i < tables.length; i++) {
+ schemas[i] = buildSchema(tables[i]);
+ }
+ return schemas;
+ }
+
+ private static KeyValueSchema buildSchema(PTable table) {
+ KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
+ if (table != null) {
+ for (PColumn column : table.getColumns()) {
+ if (!SchemaUtil.isPKColumn(column)) {
+ builder.addField(column);
+ }
+ }
+ }
+ return builder.build();
+ }
+
+ private HashJoinInfo(KeyValueSchema joinedSchema, ImmutableBytesPtr[] joinIds, List<Expression>[] joinExpressions, JoinType[] joinTypes, boolean[] earlyEvaluation, KeyValueSchema[] schemas, int[] fieldPositions, Expression postJoinFilterExpression) {
+ this.joinedSchema = joinedSchema;
+ this.joinIds = joinIds;
+ this.joinExpressions = joinExpressions;
+ this.joinTypes = joinTypes;
+ this.earlyEvaluation = earlyEvaluation;
+ this.schemas = schemas;
+ this.fieldPositions = fieldPositions;
+ this.postJoinFilterExpression = postJoinFilterExpression;
+ }
+
+ public KeyValueSchema getJoinedSchema() {
+ return joinedSchema;
+ }
+
+ public ImmutableBytesPtr[] getJoinIds() {
+ return joinIds;
+ }
+
+ public List<Expression>[] getJoinExpressions() {
+ return joinExpressions;
+ }
+
+ public JoinType[] getJoinTypes() {
+ return joinTypes;
+ }
+
+ public boolean[] earlyEvaluation() {
+ return earlyEvaluation;
+ }
+
+ public KeyValueSchema[] getSchemas() {
+ return schemas;
+ }
+
+ public int[] getFieldPositions() {
+ return fieldPositions;
+ }
+
+ public Expression getPostJoinFilterExpression() {
+ return postJoinFilterExpression;
+ }
+
+ public static void serializeHashJoinIntoScan(Scan scan, HashJoinInfo joinInfo) {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ try {
+ DataOutputStream output = new DataOutputStream(stream);
+ joinInfo.joinedSchema.write(output);
+ int count = joinInfo.joinIds.length;
+ WritableUtils.writeVInt(output, count);
+ for (int i = 0; i < count; i++) {
+ joinInfo.joinIds[i].write(output);
+ WritableUtils.writeVInt(output, joinInfo.joinExpressions[i].size());
+ for (Expression expr : joinInfo.joinExpressions[i]) {
+ WritableUtils.writeVInt(output, ExpressionType.valueOf(expr).ordinal());
+ expr.write(output);
+ }
+ WritableUtils.writeVInt(output, joinInfo.joinTypes[i].ordinal());
+ output.writeBoolean(joinInfo.earlyEvaluation[i]);
+ joinInfo.schemas[i].write(output);
+ WritableUtils.writeVInt(output, joinInfo.fieldPositions[i]);
+ }
+ if (joinInfo.postJoinFilterExpression != null) {
+ WritableUtils.writeVInt(output, ExpressionType.valueOf(joinInfo.postJoinFilterExpression).ordinal());
+ joinInfo.postJoinFilterExpression.write(output);
+ } else {
+ WritableUtils.writeVInt(output, -1);
+ }
+ scan.setAttribute(HASH_JOIN, stream.toByteArray());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
+ public static HashJoinInfo deserializeHashJoinFromScan(Scan scan) {
+ byte[] join = scan.getAttribute(HASH_JOIN);
+ if (join == null) {
+ return null;
+ }
+ ByteArrayInputStream stream = new ByteArrayInputStream(join);
+ try {
+ DataInputStream input = new DataInputStream(stream);
+ KeyValueSchema joinedSchema = new KeyValueSchema();
+ joinedSchema.readFields(input);
+ int count = WritableUtils.readVInt(input);
+ ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[count];
+ List<Expression>[] joinExpressions = new List[count];
+ JoinType[] joinTypes = new JoinType[count];
+ boolean[] earlyEvaluation = new boolean[count];
+ KeyValueSchema[] schemas = new KeyValueSchema[count];
+ int[] fieldPositions = new int[count];
+ for (int i = 0; i < count; i++) {
+ joinIds[i] = new ImmutableBytesPtr();
+ joinIds[i].readFields(input);
+ int nExprs = WritableUtils.readVInt(input);
+ joinExpressions[i] = new ArrayList<Expression>(nExprs);
+ for (int j = 0; j < nExprs; j++) {
+ int expressionOrdinal = WritableUtils.readVInt(input);
+ Expression expression = ExpressionType.values()[expressionOrdinal].newInstance();
+ expression.readFields(input);
+ joinExpressions[i].add(expression);
+ }
+ int type = WritableUtils.readVInt(input);
+ joinTypes[i] = JoinType.values()[type];
+ earlyEvaluation[i] = input.readBoolean();
+ schemas[i] = new KeyValueSchema();
+ schemas[i].readFields(input);
+ fieldPositions[i] = WritableUtils.readVInt(input);
+ }
+ Expression postJoinFilterExpression = null;
+ int expressionOrdinal = WritableUtils.readVInt(input);
+ if (expressionOrdinal != -1) {
+ postJoinFilterExpression = ExpressionType.values()[expressionOrdinal].newInstance();
+ postJoinFilterExpression.readFields(input);
+ }
+ return new HashJoinInfo(joinedSchema, joinIds, joinExpressions, joinTypes, earlyEvaluation, schemas, fieldPositions, postJoinFilterExpression);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+}