You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 23:15:55 UTC
[39/51] [partial] Initial commit of master branch from github
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
new file mode 100644
index 0000000..69108d3
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.ParameterMetaData;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
+import org.apache.phoenix.schema.ColumnNotFoundException;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.IndexUtil;
+
+
+/**
+ * Class that compiles plan to generate initial data values after a DDL command for
+ * index table.
+ */
+public class PostIndexDDLCompiler {
+ private final PhoenixConnection connection;
+ private final TableRef dataTableRef;
+
+ public PostIndexDDLCompiler(PhoenixConnection connection, TableRef dataTableRef) {
+ this.connection = connection;
+ this.dataTableRef = dataTableRef;
+ }
+
+ public MutationPlan compile(final PTable indexTable) throws SQLException {
+ return new MutationPlan() {
+
+ @Override
+ public PhoenixConnection getConnection() {
+ return connection;
+ }
+
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA;
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ return ExplainPlan.EMPTY_PLAN;
+ }
+
+ @Override
+ public MutationState execute() throws SQLException {
+ boolean wasAutoCommit = connection.getAutoCommit();
+ try {
+ connection.setAutoCommit(true);
+ /*
+ * Handles:
+ * 1) Populate a newly created table with contents.
+ * 2) Activate the index by setting the INDEX_STATE to
+ */
+ // NOTE: For first version, we would use a upsert/select to populate the new index table and
+ // returns synchronously. Creating an index on an existing table with large amount of data
+ // will as a result take a very very long time.
+ // In the long term, we should change this to an asynchronous process to populate the index
+ // that would allow the user to easily monitor the process of index creation.
+ StringBuilder indexColumns = new StringBuilder();
+ StringBuilder dataColumns = new StringBuilder();
+ for (PColumn col: dataTableRef.getTable().getColumns()) {
+ String indexColName = IndexUtil.getIndexColumnName(col);
+ try {
+ indexTable.getColumn(indexColName);
+ if (col.getFamilyName() != null) {
+ dataColumns.append('"').append(col.getFamilyName()).append("\".");
+ }
+ dataColumns.append('"').append(col.getName()).append("\",");
+ indexColumns.append('"').append(indexColName).append("\",");
+ } catch (ColumnNotFoundException e) {
+ // Catch and ignore - means that this data column is not in the index
+ }
+ }
+ dataColumns.setLength(dataColumns.length()-1);
+ indexColumns.setLength(indexColumns.length()-1);
+ String schemaName = dataTableRef.getTable().getSchemaName().getString();
+ String tableName = indexTable.getTableName().getString();
+
+ StringBuilder updateStmtStr = new StringBuilder();
+ updateStmtStr.append("UPSERT /*+ NO_INDEX */ INTO ").append(schemaName.length() == 0 ? "" : '"' + schemaName + "\".").append('"').append(tableName).append("\"(")
+ .append(indexColumns).append(") SELECT ").append(dataColumns).append(" FROM ")
+ .append(schemaName.length() == 0 ? "" : '"' + schemaName + "\".").append('"').append(dataTableRef.getTable().getTableName().getString()).append('"');
+ PreparedStatement updateStmt = connection.prepareStatement(updateStmtStr.toString());
+ int rowsUpdated = 0;
+ updateStmt.execute();
+ rowsUpdated = updateStmt.getUpdateCount();
+ // Return number of rows built for index
+ return new MutationState(rowsUpdated, connection);
+ } finally {
+ if (!wasAutoCommit) connection.setAutoCommit(false);
+ }
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
new file mode 100644
index 0000000..13a70a9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java
@@ -0,0 +1,377 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.expression.CoerceExpression;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.ClientAggregators;
+import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.expression.function.SingleAggregateFunction;
+import org.apache.phoenix.expression.visitor.SingleAggregateFunctionVisitor;
+import org.apache.phoenix.parse.AliasedNode;
+import org.apache.phoenix.parse.BindParseNode;
+import org.apache.phoenix.parse.ColumnParseNode;
+import org.apache.phoenix.parse.FamilyWildcardParseNode;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.parse.SequenceValueParseNode;
+import org.apache.phoenix.parse.WildcardParseNode;
+import org.apache.phoenix.schema.ArgumentTypeMismatchException;
+import org.apache.phoenix.schema.ColumnNotFoundException;
+import org.apache.phoenix.schema.ColumnRef;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PColumnFamily;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PDatum;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.ViewType;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.SizedUtil;
+
+
+/**
+ *
+ * Class that iterates through expressions in SELECT clause and adds projected
+ * columns to scan.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class ProjectionCompiler {
+
+ private ProjectionCompiler() {
+ }
+
+ private static void projectAllColumnFamilies(PTable table, Scan scan) {
+ // Will project all known/declared column families
+ scan.getFamilyMap().clear();
+ for (PColumnFamily family : table.getColumnFamilies()) {
+ scan.addFamily(family.getName().getBytes());
+ }
+ }
+
+ private static void projectColumnFamily(PTable table, Scan scan, byte[] family) {
+ // Will project all colmuns for given CF
+ scan.addFamily(family);
+ }
+
+ public static RowProjector compile(StatementContext context, SelectStatement statement, GroupBy groupBy) throws SQLException {
+ return compile(context, statement, groupBy, Collections.<PColumn>emptyList());
+ }
+
+ private static void projectAllTableColumns(StatementContext context, TableRef tableRef, List<Expression> projectedExpressions, List<ExpressionProjector> projectedColumns) throws SQLException {
+ PTable table = tableRef.getTable();
+ int posOffset = table.getBucketNum() == null ? 0 : 1;
+ // In SELECT *, don't include tenant column for tenant connection
+ if (tableRef.getTable().isMultiTenant() && context.getConnection().getTenantId() != null) {
+ posOffset++;
+ }
+ for (int i = posOffset; i < table.getColumns().size(); i++) {
+ ColumnRef ref = new ColumnRef(tableRef,i);
+ Expression expression = ref.newColumnExpression();
+ projectedExpressions.add(expression);
+ projectedColumns.add(new ExpressionProjector(ref.getColumn().getName().getString(), table.getName().getString(), expression, false));
+ }
+ }
+
+ private static void projectAllIndexColumns(StatementContext context, TableRef tableRef, List<Expression> projectedExpressions, List<ExpressionProjector> projectedColumns) throws SQLException {
+ PTable index = tableRef.getTable();
+ PTable table = context.getConnection().getPMetaData().getTable(index.getParentName().getString());
+ int tableOffset = table.getBucketNum() == null ? 0 : 1;
+ int indexOffset = index.getBucketNum() == null ? 0 : 1;
+ if (index.getColumns().size()-indexOffset != table.getColumns().size()-tableOffset) {
+ // We'll end up not using this by the optimizer, so just throw
+ throw new ColumnNotFoundException(WildcardParseNode.INSTANCE.toString());
+ }
+ for (int i = tableOffset; i < table.getColumns().size(); i++) {
+ PColumn tableColumn = table.getColumns().get(i);
+ PColumn indexColumn = index.getColumn(IndexUtil.getIndexColumnName(tableColumn));
+ ColumnRef ref = new ColumnRef(tableRef,indexColumn.getPosition());
+ Expression expression = ref.newColumnExpression();
+ projectedExpressions.add(expression);
+ ExpressionProjector projector = new ExpressionProjector(tableColumn.getName().getString(), table.getName().getString(), expression, false);
+ projectedColumns.add(projector);
+ }
+ }
+
+ private static void projectTableColumnFamily(StatementContext context, String cfName, TableRef tableRef, List<Expression> projectedExpressions, List<ExpressionProjector> projectedColumns) throws SQLException {
+ PTable table = tableRef.getTable();
+ PColumnFamily pfamily = table.getColumnFamily(cfName);
+ for (PColumn column : pfamily.getColumns()) {
+ ColumnRef ref = new ColumnRef(tableRef, column.getPosition());
+ Expression expression = ref.newColumnExpression();
+ projectedExpressions.add(expression);
+ projectedColumns.add(new ExpressionProjector(column.getName().toString(), table.getName()
+ .getString(), expression, false));
+ }
+ }
+
+ private static void projectIndexColumnFamily(StatementContext context, String cfName, TableRef tableRef, List<Expression> projectedExpressions, List<ExpressionProjector> projectedColumns) throws SQLException {
+ PTable index = tableRef.getTable();
+ PTable table = context.getConnection().getPMetaData().getTable(index.getParentName().getString());
+ PColumnFamily pfamily = table.getColumnFamily(cfName);
+ for (PColumn column : pfamily.getColumns()) {
+ PColumn indexColumn = index.getColumn(IndexUtil.getIndexColumnName(column));
+ ColumnRef ref = new ColumnRef(tableRef, indexColumn.getPosition());
+ Expression expression = ref.newColumnExpression();
+ projectedExpressions.add(expression);
+ projectedColumns.add(new ExpressionProjector(column.getName().toString(),
+ table.getName().getString(), expression, false));
+ }
+ }
+
+ /**
+ * Builds the projection for the scan
+ * @param context query context kept between compilation of different query clauses
+ * @param statement TODO
+ * @param groupBy compiled GROUP BY clause
+ * @param targetColumns list of columns, parallel to aliasedNodes, that are being set for an
+ * UPSERT SELECT statement. Used to coerce expression types to the expected target type.
+ * @return projector used to access row values during scan
+ * @throws SQLException
+ */
+ public static RowProjector compile(StatementContext context, SelectStatement statement, GroupBy groupBy, List<? extends PDatum> targetColumns) throws SQLException {
+ List<AliasedNode> aliasedNodes = statement.getSelect();
+ // Setup projected columns in Scan
+ SelectClauseVisitor selectVisitor = new SelectClauseVisitor(context, groupBy);
+ List<ExpressionProjector> projectedColumns = new ArrayList<ExpressionProjector>();
+ TableRef tableRef = context.getResolver().getTables().get(0);
+ PTable table = tableRef.getTable();
+ boolean isWildcard = false;
+ Scan scan = context.getScan();
+ int index = 0;
+ List<Expression> projectedExpressions = Lists.newArrayListWithExpectedSize(aliasedNodes.size());
+ List<byte[]> projectedFamilies = Lists.newArrayListWithExpectedSize(aliasedNodes.size());
+ for (AliasedNode aliasedNode : aliasedNodes) {
+ ParseNode node = aliasedNode.getNode();
+ // TODO: visitor?
+ if (node instanceof WildcardParseNode) {
+ if (statement.isAggregate()) {
+ ExpressionCompiler.throwNonAggExpressionInAggException(node.toString());
+ }
+ isWildcard = true;
+ if (tableRef.getTable().getType() == PTableType.INDEX && ((WildcardParseNode)node).isRewrite()) {
+ projectAllIndexColumns(context, tableRef, projectedExpressions, projectedColumns);
+ } else {
+ projectAllTableColumns(context, tableRef, projectedExpressions, projectedColumns);
+ }
+ } else if (node instanceof FamilyWildcardParseNode){
+ // Project everything for SELECT cf.*
+ // TODO: support cf.* expressions for multiple tables the same way with *.
+ String cfName = ((FamilyWildcardParseNode) node).getName();
+ // Delay projecting to scan, as when any other column in the column family gets
+ // added to the scan, it overwrites that we want to project the entire column
+ // family. Instead, we do the projection at the end.
+ // TODO: consider having a ScanUtil.addColumn and ScanUtil.addFamily to work
+ // around this, as this code depends on this function being the last place where
+ // columns are projected (which is currently true, but could change).
+ projectedFamilies.add(Bytes.toBytes(cfName));
+ if (tableRef.getTable().getType() == PTableType.INDEX && ((FamilyWildcardParseNode)node).isRewrite()) {
+ projectIndexColumnFamily(context, cfName, tableRef, projectedExpressions, projectedColumns);
+ } else {
+ projectTableColumnFamily(context, cfName, tableRef, projectedExpressions, projectedColumns);
+ }
+ } else {
+ Expression expression = node.accept(selectVisitor);
+ projectedExpressions.add(expression);
+ if (index < targetColumns.size()) {
+ PDatum targetColumn = targetColumns.get(index);
+ if (targetColumn.getDataType() != expression.getDataType()) {
+ PDataType targetType = targetColumn.getDataType();
+ // Check if coerce allowed using more relaxed isCastableTo check, since we promote INTEGER to LONG
+ // during expression evaluation and then convert back to INTEGER on UPSERT SELECT (and we don't have
+ // (an actual value we can specifically check against).
+ if (expression.getDataType() != null && !expression.getDataType().isCastableTo(targetType)) {
+ throw new ArgumentTypeMismatchException(targetType, expression.getDataType(), "column: " + targetColumn);
+ }
+ expression = CoerceExpression.create(expression, targetType);
+ }
+ }
+ if (node instanceof BindParseNode) {
+ context.getBindManager().addParamMetaData((BindParseNode)node, expression);
+ }
+ if (!node.isStateless()) {
+ if (!selectVisitor.isAggregate() && statement.isAggregate()) {
+ ExpressionCompiler.throwNonAggExpressionInAggException(expression.toString());
+ }
+ }
+ String columnAlias = aliasedNode.getAlias() != null ? aliasedNode.getAlias() : SchemaUtil.normalizeIdentifier(aliasedNode.getNode().getAlias());
+ boolean isCaseSensitive = (columnAlias != null && (aliasedNode.isCaseSensitve() || SchemaUtil.isCaseSensitive(columnAlias))) || selectVisitor.isCaseSensitive;
+ String name = columnAlias == null ? expression.toString() : columnAlias;
+ projectedColumns.add(new ExpressionProjector(name, table.getName().getString(), expression, isCaseSensitive));
+ }
+ selectVisitor.reset();
+ index++;
+ }
+
+ table = context.getCurrentTable().getTable(); // switch to current table for scan projection
+ // TODO make estimatedByteSize more accurate by counting the joined columns.
+ int estimatedKeySize = table.getRowKeySchema().getEstimatedValueLength();
+ int estimatedByteSize = 0;
+ for (Map.Entry<byte[],NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
+ PColumnFamily family = table.getColumnFamily(entry.getKey());
+ if (entry.getValue() == null) {
+ for (PColumn column : family.getColumns()) {
+ Integer byteSize = column.getByteSize();
+ estimatedByteSize += SizedUtil.KEY_VALUE_SIZE + estimatedKeySize + (byteSize == null ? RowKeySchema.ESTIMATED_VARIABLE_LENGTH_SIZE : byteSize);
+ }
+ } else {
+ for (byte[] cq : entry.getValue()) {
+ PColumn column = family.getColumn(cq);
+ Integer byteSize = column.getByteSize();
+ estimatedByteSize += SizedUtil.KEY_VALUE_SIZE + estimatedKeySize + (byteSize == null ? RowKeySchema.ESTIMATED_VARIABLE_LENGTH_SIZE : byteSize);
+ }
+ }
+ }
+
+ selectVisitor.compile();
+ // Since we don't have the empty key value in read-only tables,
+ // we must project everything.
+ boolean isProjectEmptyKeyValue = table.getType() != PTableType.VIEW && table.getViewType() != ViewType.MAPPED && !isWildcard;
+ if (isProjectEmptyKeyValue) {
+ for (byte[] family : projectedFamilies) {
+ projectColumnFamily(table, scan, family);
+ }
+ } else {
+ /*
+ * TODO: this could be optimized by detecting:
+ * - if a column is projected that's not in the where clause
+ * - if a column is grouped by that's not in the where clause
+ * - if we're not using IS NULL or CASE WHEN expressions
+ */
+ projectAllColumnFamilies(table,scan);
+ }
+ return new RowProjector(projectedColumns, estimatedByteSize, isProjectEmptyKeyValue);
+ }
+
+ private static class SelectClauseVisitor extends ExpressionCompiler {
+ private static int getMinNullableIndex(List<SingleAggregateFunction> aggFuncs, boolean isUngroupedAggregation) {
+ int minNullableIndex = aggFuncs.size();
+ for (int i = 0; i < aggFuncs.size(); i++) {
+ SingleAggregateFunction aggFunc = aggFuncs.get(i);
+ if (isUngroupedAggregation ? aggFunc.getAggregator().isNullable() : aggFunc.getAggregatorExpression().isNullable()) {
+ minNullableIndex = i;
+ break;
+ }
+ }
+ return minNullableIndex;
+ }
+
+ /**
+ * Track whether or not the projection expression is case sensitive. We use this
+ * information to determine whether or not we normalize the column name passed
+ */
+ private boolean isCaseSensitive;
+ private int elementCount;
+
+ private SelectClauseVisitor(StatementContext context, GroupBy groupBy) {
+ super(context, groupBy);
+ reset();
+ }
+
+
+ /**
+ * Compiles projection by:
+ * 1) Adding RowCount aggregate function if not present when limiting rows. We need this
+ * to track how many rows have been scanned.
+ * 2) Reordering aggregation functions (by putting fixed length aggregates first) to
+ * optimize the positional access of the aggregated value.
+ */
+ private void compile() throws SQLException {
+ final Set<SingleAggregateFunction> aggFuncSet = Sets.newHashSetWithExpectedSize(context.getExpressionManager().getExpressionCount());
+
+ Iterator<Expression> expressions = context.getExpressionManager().getExpressions();
+ while (expressions.hasNext()) {
+ Expression expression = expressions.next();
+ expression.accept(new SingleAggregateFunctionVisitor() {
+ @Override
+ public Iterator<Expression> visitEnter(SingleAggregateFunction function) {
+ aggFuncSet.add(function);
+ return Iterators.emptyIterator();
+ }
+ });
+ }
+ if (aggFuncSet.isEmpty() && groupBy.isEmpty()) {
+ return;
+ }
+ List<SingleAggregateFunction> aggFuncs = new ArrayList<SingleAggregateFunction>(aggFuncSet);
+ Collections.sort(aggFuncs, SingleAggregateFunction.SCHEMA_COMPARATOR);
+
+ int minNullableIndex = getMinNullableIndex(aggFuncs,groupBy.isEmpty());
+ context.getScan().setAttribute(GroupedAggregateRegionObserver.AGGREGATORS, ServerAggregators.serialize(aggFuncs, minNullableIndex));
+ ClientAggregators clientAggregators = new ClientAggregators(aggFuncs, minNullableIndex);
+ context.getAggregationManager().setAggregators(clientAggregators);
+ }
+
+ @Override
+ public void reset() {
+ super.reset();
+ elementCount = 0;
+ isCaseSensitive = true;
+ }
+
+ @Override
+ protected ColumnRef resolveColumn(ColumnParseNode node) throws SQLException {
+ ColumnRef ref = super.resolveColumn(node);
+ isCaseSensitive = isCaseSensitive && node.isCaseSensitive();
+ return ref;
+ }
+
+ @Override
+ public void addElement(List<Expression> l, Expression element) {
+ elementCount++;
+ isCaseSensitive &= elementCount == 1;
+ super.addElement(l, element);
+ }
+
+ @Override
+ public Expression visit(SequenceValueParseNode node) throws SQLException {
+ if (aggregateFunction != null) {
+ throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_USE_OF_NEXT_VALUE_FOR)
+ .setSchemaName(node.getTableName().getSchemaName())
+ .setTableName(node.getTableName().getTableName()).build().buildException();
+ }
+ return context.getSequenceManager().newSequenceReference(node);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
new file mode 100644
index 0000000..bafacf9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -0,0 +1,289 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.JoinCompiler.JoinSpec;
+import org.apache.phoenix.compile.JoinCompiler.JoinTable;
+import org.apache.phoenix.compile.JoinCompiler.JoinedTableColumnResolver;
+import org.apache.phoenix.compile.JoinCompiler.PTableWrapper;
+import org.apache.phoenix.compile.JoinCompiler.ProjectedPTableWrapper;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.execute.AggregatePlan;
+import org.apache.phoenix.execute.BasicQueryPlan;
+import org.apache.phoenix.execute.DegenerateQueryPlan;
+import org.apache.phoenix.execute.HashJoinPlan;
+import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.join.ScanProjector;
+import org.apache.phoenix.parse.JoinTableNode.JoinType;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.parse.SQLParser;
+import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.AmbiguousColumnException;
+import org.apache.phoenix.schema.ColumnNotFoundException;
+import org.apache.phoenix.schema.PDatum;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.ScanUtil;
+
+
+
+/**
+ *
+ * Class used to build an executable query plan
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class QueryCompiler {
+ /*
+ * Not using Scan.setLoadColumnFamiliesOnDemand(true) because we don't
+ * want to introduce a dependency on 0.94.5 (where this feature was
+ * introduced). This will do the same thing. Once we do have a
+ * dependency on 0.94.5 or above, switch this around.
+ */
+ private static final String LOAD_COLUMN_FAMILIES_ON_DEMAND_ATTR = "_ondemand_";
+ private final PhoenixStatement statement;
+ private final Scan scan;
+ private final Scan scanCopy;
+ private final List<? extends PDatum> targetColumns;
+ private final ParallelIteratorFactory parallelIteratorFactory;
+
+ public QueryCompiler(PhoenixStatement statement) throws SQLException {
+ this(statement, Collections.<PDatum>emptyList(), null);
+ }
+
+ public QueryCompiler(PhoenixStatement statement, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory) throws SQLException {
+ this.statement = statement;
+ this.scan = new Scan();
+ this.targetColumns = targetColumns;
+ this.parallelIteratorFactory = parallelIteratorFactory;
+ if (statement.getConnection().getQueryServices().getLowestClusterHBaseVersion() >= PhoenixDatabaseMetaData.ESSENTIAL_FAMILY_VERSION_THRESHOLD) {
+ this.scan.setAttribute(LOAD_COLUMN_FAMILIES_ON_DEMAND_ATTR, QueryConstants.TRUE);
+ }
+ this.scanCopy = ScanUtil.newScan(scan);
+ }
+
+ /**
+ * Builds an executable query plan from a parsed SQL statement
+ * @param select parsed SQL statement
+ * @return executable query plan
+ * @throws SQLException if mismatched types are found, bind value do not match binds,
+ * or invalid function arguments are encountered.
+ * @throws SQLFeatureNotSupportedException if an unsupported construct is encountered
+ * @throws TableNotFoundException if table name not found in schema
+ * @throws ColumnNotFoundException if column name could not be resolved
+ * @throws AmbiguousColumnException if an unaliased column name is ambiguous across multiple tables
+ */
+ public QueryPlan compile(SelectStatement select) throws SQLException{
+ return compile(select, scan, false);
+ }
+
+ protected QueryPlan compile(SelectStatement select, Scan scan, boolean asSubquery) throws SQLException{
+ PhoenixConnection connection = statement.getConnection();
+ List<Object> binds = statement.getParameters();
+ ColumnResolver resolver = FromCompiler.getMultiTableResolver(select, connection);
+ // TODO: do this normalization outside of this so as it's not repeated by the optimizer
+ select = StatementNormalizer.normalize(select, resolver);
+ StatementContext context = new StatementContext(statement, resolver, binds, scan);
+
+ if (select.getFrom().size() == 1)
+ return compileSingleQuery(context, select, binds);
+
+ if (!asSubquery) {
+ SelectStatement optimized = JoinCompiler.optimize(context, select, statement);
+ if (optimized != select) {
+ select = optimized;
+ // TODO: this is a relatively expensive operation that shouldn't be
+ // done multiple times
+ resolver = FromCompiler.getMultiTableResolver(select, connection);
+ context.setResolver(resolver);
+ }
+ }
+ JoinSpec join = JoinCompiler.getJoinSpec(context, select);
+ return compileJoinQuery(context, select, binds, join, asSubquery);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected QueryPlan compileJoinQuery(StatementContext context, SelectStatement select, List<Object> binds, JoinSpec join, boolean asSubquery) throws SQLException {
+ byte[] emptyByteArray = new byte[0];
+ List<JoinTable> joinTables = join.getJoinTables();
+ if (joinTables.isEmpty()) {
+ ProjectedPTableWrapper projectedTable = join.createProjectedTable(join.getMainTable(), !asSubquery);
+ ScanProjector.serializeProjectorIntoScan(context.getScan(), JoinCompiler.getScanProjector(projectedTable));
+ context.setCurrentTable(join.getMainTable());
+ context.setResolver(JoinCompiler.getColumnResolver(projectedTable));
+ join.projectColumns(context.getScan(), join.getMainTable());
+ return compileSingleQuery(context, select, binds);
+ }
+
+ boolean[] starJoinVector = JoinCompiler.getStarJoinVector(join);
+ if (starJoinVector != null) {
+ ProjectedPTableWrapper initialProjectedTable = join.createProjectedTable(join.getMainTable(), !asSubquery);
+ PTableWrapper projectedTable = initialProjectedTable;
+ int count = joinTables.size();
+ ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[count];
+ List<Expression>[] joinExpressions = new List[count];
+ List<Expression>[] hashExpressions = new List[count];
+ JoinType[] joinTypes = new JoinType[count];
+ PTable[] tables = new PTable[count];
+ int[] fieldPositions = new int[count];
+ QueryPlan[] joinPlans = new QueryPlan[count];
+ fieldPositions[0] = projectedTable.getTable().getColumns().size() - projectedTable.getTable().getPKColumns().size();
+ for (int i = 0; i < count; i++) {
+ JoinTable joinTable = joinTables.get(i);
+ SelectStatement subStatement = joinTable.getAsSubquery();
+ if (subStatement.getFrom().size() > 1)
+ throw new SQLFeatureNotSupportedException("Sub queries not supported.");
+ ProjectedPTableWrapper subProjTable = join.createProjectedTable(joinTable.getTable(), false);
+ ColumnResolver resolver = JoinCompiler.getColumnResolver(subProjTable);
+ Scan subScan = ScanUtil.newScan(scanCopy);
+ ScanProjector.serializeProjectorIntoScan(subScan, JoinCompiler.getScanProjector(subProjTable));
+ StatementContext subContext = new StatementContext(statement, resolver, binds, subScan);
+ subContext.setCurrentTable(joinTable.getTable());
+ join.projectColumns(subScan, joinTable.getTable());
+ joinPlans[i] = compileSingleQuery(subContext, subStatement, binds);
+ boolean hasPostReference = join.hasPostReference(joinTable.getTable());
+ if (hasPostReference) {
+ tables[i] = subProjTable.getTable();
+ projectedTable = JoinCompiler.mergeProjectedTables(projectedTable, subProjTable, joinTable.getType() == JoinType.Inner);
+ } else {
+ tables[i] = null;
+ }
+ ColumnResolver leftResolver = JoinCompiler.getColumnResolver(starJoinVector[i] ? initialProjectedTable : projectedTable);
+ joinIds[i] = new ImmutableBytesPtr(emptyByteArray); // place-holder
+ Pair<List<Expression>, List<Expression>> joinConditions = joinTable.compileJoinConditions(context, leftResolver, resolver);
+ joinExpressions[i] = joinConditions.getFirst();
+ hashExpressions[i] = joinConditions.getSecond();
+ joinTypes[i] = joinTable.getType();
+ if (i < count - 1) {
+ fieldPositions[i + 1] = fieldPositions[i] + (tables[i] == null ? 0 : (tables[i].getColumns().size() - tables[i].getPKColumns().size()));
+ }
+ }
+ ScanProjector.serializeProjectorIntoScan(context.getScan(), JoinCompiler.getScanProjector(initialProjectedTable));
+ context.setCurrentTable(join.getMainTable());
+ context.setResolver(JoinCompiler.getColumnResolver(projectedTable));
+ join.projectColumns(context.getScan(), join.getMainTable());
+ BasicQueryPlan plan = compileSingleQuery(context, JoinCompiler.getSubqueryWithoutJoin(select, join), binds);
+ Expression postJoinFilterExpression = join.compilePostFilterExpression(context);
+ HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, joinExpressions, joinTypes, starJoinVector, tables, fieldPositions, postJoinFilterExpression);
+ return new HashJoinPlan(plan, joinInfo, hashExpressions, joinPlans);
+ }
+
+ JoinTable lastJoinTable = joinTables.get(joinTables.size() - 1);
+ JoinType type = lastJoinTable.getType();
+ if (type == JoinType.Full)
+ throw new SQLFeatureNotSupportedException("Full joins not supported.");
+
+ if (type == JoinType.Right || type == JoinType.Inner) {
+ SelectStatement lhs = JoinCompiler.getSubQueryWithoutLastJoin(select, join);
+ SelectStatement rhs = JoinCompiler.getSubqueryForLastJoinTable(select, join);
+ JoinSpec lhsJoin = JoinCompiler.getSubJoinSpecWithoutPostFilters(join);
+ Scan subScan = ScanUtil.newScan(scanCopy);
+ StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), binds, subScan);
+ QueryPlan lhsPlan = compileJoinQuery(lhsCtx, lhs, binds, lhsJoin, true);
+ ColumnResolver lhsResolver = lhsCtx.getResolver();
+ PTableWrapper lhsProjTable = ((JoinedTableColumnResolver) (lhsResolver)).getPTableWrapper();
+ ProjectedPTableWrapper rhsProjTable = join.createProjectedTable(lastJoinTable.getTable(), !asSubquery);
+ ColumnResolver rhsResolver = JoinCompiler.getColumnResolver(rhsProjTable);
+ ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[] {new ImmutableBytesPtr(emptyByteArray)};
+ Pair<List<Expression>, List<Expression>> joinConditions = lastJoinTable.compileJoinConditions(context, lhsResolver, rhsResolver);
+ List<Expression> joinExpressions = joinConditions.getSecond();
+ List<Expression> hashExpressions = joinConditions.getFirst();
+ int fieldPosition = rhsProjTable.getTable().getColumns().size() - rhsProjTable.getTable().getPKColumns().size();
+ PTableWrapper projectedTable = JoinCompiler.mergeProjectedTables(rhsProjTable, lhsProjTable, type == JoinType.Inner);
+ ScanProjector.serializeProjectorIntoScan(context.getScan(), JoinCompiler.getScanProjector(rhsProjTable));
+ context.setCurrentTable(lastJoinTable.getTable());
+ context.setResolver(JoinCompiler.getColumnResolver(projectedTable));
+ join.projectColumns(context.getScan(), lastJoinTable.getTable());
+ BasicQueryPlan rhsPlan = compileSingleQuery(context, rhs, binds);
+ Expression postJoinFilterExpression = join.compilePostFilterExpression(context);
+ HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, new List[] {joinExpressions}, new JoinType[] {type == JoinType.Inner ? type : JoinType.Left}, new boolean[] {true}, new PTable[] {lhsProjTable.getTable()}, new int[] {fieldPosition}, postJoinFilterExpression);
+ return new HashJoinPlan(rhsPlan, joinInfo, new List[] {hashExpressions}, new QueryPlan[] {lhsPlan});
+ }
+
+ // Do not support queries like "A right join B left join C" with hash-joins.
+ throw new SQLFeatureNotSupportedException("Joins with pattern 'A right join B left join C' not supported.");
+ }
+
+ protected BasicQueryPlan compileSingleQuery(StatementContext context, SelectStatement select, List<Object> binds) throws SQLException{
+ PhoenixConnection connection = statement.getConnection();
+ ColumnResolver resolver = context.getResolver();
+ TableRef tableRef = context.getCurrentTable();
+ // Short circuit out if we're compiling an index query and the index isn't active.
+ // We must do this after the ColumnResolver resolves the table, as we may be updating the local
+ // cache of the index table and it may now be inactive.
+ if (tableRef.getTable().getType() == PTableType.INDEX && tableRef.getTable().getIndexState() != PIndexState.ACTIVE) {
+ return new DegenerateQueryPlan(context, select, tableRef);
+ }
+ PTable table = tableRef.getTable();
+ if (table.getViewStatement() != null) {
+ ParseNode viewNode = new SQLParser(table.getViewStatement()).parseQuery().getWhere();
+ // Push VIEW expression into select
+ select = select.combine(viewNode);
+ }
+ Integer limit = LimitCompiler.compile(context, select);
+
+ GroupBy groupBy = GroupByCompiler.compile(context, select);
+ // Optimize the HAVING clause by finding any group by expressions that can be moved
+ // to the WHERE clause
+ select = HavingCompiler.rewrite(context, select, groupBy);
+ Expression having = HavingCompiler.compile(context, select, groupBy);
+ // Don't pass groupBy when building where clause expression, because we do not want to wrap these
+ // expressions as group by key expressions since they're pre, not post filtered.
+ context.setResolver(FromCompiler.getResolver(select, connection));
+ WhereCompiler.compile(context, select);
+ context.setResolver(resolver); // recover resolver
+ OrderBy orderBy = OrderByCompiler.compile(context, select, groupBy, limit);
+ RowProjector projector = ProjectionCompiler.compile(context, select, groupBy, targetColumns);
+
+ // Final step is to build the query plan
+ int maxRows = statement.getMaxRows();
+ if (maxRows > 0) {
+ if (limit != null) {
+ limit = Math.min(limit, maxRows);
+ } else {
+ limit = maxRows;
+ }
+ }
+ if (select.isAggregate() || select.isDistinct()) {
+ return new AggregatePlan(context, select, tableRef, projector, limit, orderBy, parallelIteratorFactory, groupBy, having);
+ } else {
+ return new ScanPlan(context, select, tableRef, projector, limit, orderBy, parallelIteratorFactory);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
new file mode 100644
index 0000000..d41fb7f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.TableRef;
+
+
+
+/**
+ *
+ * Interface for an executable query plan
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public interface QueryPlan extends StatementPlan {
+ /**
+ * Get a result iterator to iterate over the results
+ * @return result iterator for iterating over the results
+ * @throws SQLException
+ */
+ public ResultIterator iterator() throws SQLException;
+
+ public long getEstimatedSize();
+
+ // TODO: change once joins are supported
+ TableRef getTableRef();
+ /**
+ * Returns projector used to formulate resultSet row
+ */
+ RowProjector getProjector();
+
+ Integer getLimit();
+
+ OrderBy getOrderBy();
+
+ GroupBy getGroupBy();
+
+ List<KeyRange> getSplits();
+
+ StatementContext getContext();
+
+ FilterableStatement getStatement();
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java
new file mode 100644
index 0000000..6b6344c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.SQLException;
+import java.util.*;
+
+import com.google.common.collect.Maps;
+import org.apache.phoenix.schema.ColumnNotFoundException;
+import org.apache.phoenix.util.SchemaUtil;
+
+
+/**
+ *
+ * Class that manages a set of projected columns accessed through the zero-based
+ * column index for a SELECT clause projection. The column index may be looked up
+ * via the name using {@link #getColumnIndex(String)}.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class RowProjector {
+ public static final RowProjector EMPTY_PROJECTOR = new RowProjector(Collections.<ColumnProjector>emptyList(),0, true);
+
+ private final List<? extends ColumnProjector> columnProjectors;
+ private final Map<String,Integer> reverseIndex;
+ private final boolean allCaseSensitive;
+ private final boolean someCaseSensitive;
+ private final int estimatedSize;
+ private final boolean isProjectEmptyKeyValue;
+
+ public RowProjector(RowProjector projector, boolean isProjectEmptyKeyValue) {
+ this(projector.getColumnProjectors(), projector.getEstimatedRowByteSize(), isProjectEmptyKeyValue);
+ }
+ /**
+ * Construct RowProjector based on a list of ColumnProjectors.
+ * @param columnProjectors ordered list of ColumnProjectors corresponding to projected columns in SELECT clause
+ * aggregating coprocessor. Only required in the case of an aggregate query with a limit clause and otherwise may
+ * be null.
+ * @param estimatedRowSize
+ */
+ public RowProjector(List<? extends ColumnProjector> columnProjectors, int estimatedRowSize, boolean isProjectEmptyKeyValue) {
+ this.columnProjectors = Collections.unmodifiableList(columnProjectors);
+ int position = columnProjectors.size();
+ reverseIndex = Maps.newHashMapWithExpectedSize(position);
+ boolean allCaseSensitive = true;
+ boolean someCaseSensitive = false;
+ for (--position; position >= 0; position--) {
+ ColumnProjector colProjector = columnProjectors.get(position);
+ allCaseSensitive &= colProjector.isCaseSensitive();
+ someCaseSensitive |= colProjector.isCaseSensitive();
+ reverseIndex.put(colProjector.getName(), position);
+ }
+ this.allCaseSensitive = allCaseSensitive;
+ this.someCaseSensitive = someCaseSensitive;
+ this.estimatedSize = estimatedRowSize;
+ this.isProjectEmptyKeyValue = isProjectEmptyKeyValue;
+ }
+
+ public boolean isProjectEmptyKeyValue() {
+ return isProjectEmptyKeyValue;
+ }
+
+ public List<? extends ColumnProjector> getColumnProjectors() {
+ return columnProjectors;
+ }
+
+ public int getColumnIndex(String name) throws SQLException {
+ if (!someCaseSensitive) {
+ name = SchemaUtil.normalizeIdentifier(name);
+ }
+ Integer index = reverseIndex.get(name);
+ if (index == null) {
+ if (!allCaseSensitive && someCaseSensitive) {
+ name = SchemaUtil.normalizeIdentifier(name);
+ index = reverseIndex.get(name);
+ }
+ if (index == null) {
+ throw new ColumnNotFoundException(name);
+ }
+ }
+ return index;
+ }
+
+ public ColumnProjector getColumnProjector(int index) {
+ return columnProjectors.get(index);
+ }
+
+ public int getColumnCount() {
+ return columnProjectors.size();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder("[");
+ for (ColumnProjector projector : columnProjectors) {
+ buf.append(projector.getExpression());
+ buf.append(',');
+ }
+ buf.setCharAt(buf.length()-1, ']');
+ return buf.toString();
+ }
+
+ public int getEstimatedRowByteSize() {
+ return estimatedSize;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
new file mode 100644
index 0000000..44f4473
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.RowKeySchema;
+import org.apache.phoenix.util.ScanUtil;
+
+
+public class ScanRanges {
+ private static final List<List<KeyRange>> EVERYTHING_RANGES = Collections.<List<KeyRange>>emptyList();
+ private static final List<List<KeyRange>> NOTHING_RANGES = Collections.<List<KeyRange>>singletonList(Collections.<KeyRange>singletonList(KeyRange.EMPTY_RANGE));
+ public static final ScanRanges EVERYTHING = new ScanRanges(EVERYTHING_RANGES,null,false);
+ public static final ScanRanges NOTHING = new ScanRanges(NOTHING_RANGES,null,false);
+
+ public static ScanRanges create(List<List<KeyRange>> ranges, RowKeySchema schema) {
+ return create(ranges, schema, false);
+ }
+
+ public static ScanRanges create(List<List<KeyRange>> ranges, RowKeySchema schema, boolean forceRangeScan) {
+ if (ranges.isEmpty()) {
+ return EVERYTHING;
+ } else if (ranges.size() == 1 && ranges.get(0).size() == 1 && ranges.get(0).get(0) == KeyRange.EMPTY_RANGE) {
+ return NOTHING;
+ }
+ return new ScanRanges(ranges, schema, forceRangeScan);
+ }
+
+ private SkipScanFilter filter;
+ private final List<List<KeyRange>> ranges;
+ private final RowKeySchema schema;
+ private final boolean forceRangeScan;
+
+ private ScanRanges (List<List<KeyRange>> ranges, RowKeySchema schema, boolean forceRangeScan) {
+ List<List<KeyRange>> sortedRanges = Lists.newArrayListWithExpectedSize(ranges.size());
+ for (int i = 0; i < ranges.size(); i++) {
+ List<KeyRange> sorted = Lists.newArrayList(ranges.get(i));
+ Collections.sort(sorted, KeyRange.COMPARATOR);
+ sortedRanges.add(ImmutableList.copyOf(sorted));
+ }
+ this.ranges = ImmutableList.copyOf(sortedRanges);
+ this.schema = schema;
+ if (schema != null && !ranges.isEmpty()) {
+ this.filter = new SkipScanFilter(this.ranges, schema);
+ }
+ this.forceRangeScan = forceRangeScan;
+ }
+
+ public SkipScanFilter getSkipScanFilter() {
+ return filter;
+ }
+
+ public List<List<KeyRange>> getRanges() {
+ return ranges;
+ }
+
+ public RowKeySchema getSchema() {
+ return schema;
+ }
+
+ public boolean isEverything() {
+ return this == EVERYTHING;
+ }
+
+ public boolean isDegenerate() {
+ return this == NOTHING;
+ }
+
+ /**
+ * Use SkipScanFilter under two circumstances:
+ * 1) If we have multiple ranges for a given key slot (use of IN)
+ * 2) If we have a range (i.e. not a single/point key) that is
+ * not the last key slot
+ */
+ public boolean useSkipScanFilter() {
+ if (forceRangeScan) {
+ return false;
+ }
+ boolean hasRangeKey = false, useSkipScan = false;
+ for (List<KeyRange> orRanges : ranges) {
+ useSkipScan |= orRanges.size() > 1 | hasRangeKey;
+ if (useSkipScan) {
+ return true;
+ }
+ for (KeyRange range : orRanges) {
+ hasRangeKey |= !range.isSingleKey();
+ }
+ }
+ return false;
+ }
+
+ /**
+ * @return true if this represents the full key to a single row
+ */
+ public boolean isSingleRowScan() {
+ if (schema == null || ranges.size() < schema.getMaxFields()) {
+ return false;
+ }
+ boolean isSingleKey = true;
+ for (List<KeyRange> orRanges : ranges) {
+ if (orRanges.size() > 1) {
+ return false;
+ }
+ isSingleKey &= orRanges.get(0).isSingleKey();
+ }
+ return isSingleKey;
+ }
+
+ public void setScanStartStopRow(Scan scan) {
+ if (isEverything()) {
+ return;
+ }
+ if (isDegenerate()) {
+ scan.setStartRow(KeyRange.EMPTY_RANGE.getLowerRange());
+ scan.setStopRow(KeyRange.EMPTY_RANGE.getUpperRange());
+ return;
+ }
+
+ byte[] expectedKey;
+ expectedKey = ScanUtil.getMinKey(schema, ranges);
+ if (expectedKey != null) {
+ scan.setStartRow(expectedKey);
+ }
+ expectedKey = ScanUtil.getMaxKey(schema, ranges);
+ if (expectedKey != null) {
+ scan.setStopRow(expectedKey);
+ }
+ }
+
+ public static final ImmutableBytesWritable UNBOUND = new ImmutableBytesWritable(KeyRange.UNBOUND);
+
+ /**
+ * Return true if the range formed by the lowerInclusiveKey and upperExclusiveKey
+ * intersects with any of the scan ranges and false otherwise. We cannot pass in
+ * a KeyRange here, because the underlying compare functions expect lower inclusive
+ * and upper exclusive keys. We cannot get their next key because the key must
+ * conform to the row key schema and if a null byte is added to a lower inclusive
+ * key, it's no longer a valid, real key.
+ * @param lowerInclusiveKey lower inclusive key
+ * @param upperExclusiveKey upper exclusive key
+ * @return true if the scan range intersects with the specified lower/upper key
+ * range
+ */
+ public boolean intersect(byte[] lowerInclusiveKey, byte[] upperExclusiveKey) {
+ if (isEverything()) {
+ return true;
+ }
+ if (isDegenerate()) {
+ return false;
+ }
+ return filter.hasIntersect(lowerInclusiveKey, upperExclusiveKey);
+ }
+
+ @Override
+ public String toString() {
+ return "ScanRanges[" + ranges.toString() + "]";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
new file mode 100644
index 0000000..665c33b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
@@ -0,0 +1,194 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.SQLException;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.phoenix.expression.BaseTerminalExpression;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.parse.SequenceValueParseNode;
+import org.apache.phoenix.parse.TableName;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.schema.PDataType;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.SequenceKey;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+public class SequenceManager {
+ private final PhoenixStatement statement;
+ private int[] sequencePosition;
+ private long[] srcSequenceValues;
+ private long[] dstSequenceValues;
+ private SQLException[] sqlExceptions;
+ private List<SequenceKey> nextSequences;
+ private List<SequenceKey> currentSequences;
+ private Map<SequenceKey,SequenceValueExpression> sequenceMap;
+ private BitSet isNextSequence;
+
+ public SequenceManager(PhoenixStatement statement) {
+ this.statement = statement;
+ }
+
+ public int getSequenceCount() {
+ return sequenceMap == null ? 0 : sequenceMap.size();
+ }
+
+ private void setSequenceValues() throws SQLException {
+ SQLException eTop = null;
+ for (int i = 0; i < sqlExceptions.length; i++) {
+ SQLException e = sqlExceptions[i];
+ if (e != null) {
+ if (eTop == null) {
+ eTop = e;
+ } else {
+ e.setNextException(eTop.getNextException());
+ eTop.setNextException(e);
+ }
+ } else {
+ dstSequenceValues[sequencePosition[i]] = srcSequenceValues[i];
+ }
+ }
+ if (eTop != null) {
+ throw eTop;
+ }
+ }
+
+ public void incrementSequenceValues() throws SQLException {
+ if (sequenceMap == null) {
+ return;
+ }
+ Long scn = statement.getConnection().getSCN();
+ long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
+ ConnectionQueryServices services = this.statement.getConnection().getQueryServices();
+ services.incrementSequenceValues(nextSequences, timestamp, srcSequenceValues, sqlExceptions);
+ setSequenceValues();
+ int offset = nextSequences.size();
+ for (int i = 0; i < currentSequences.size(); i++) {
+ dstSequenceValues[sequencePosition[offset+i]] = services.getSequenceValue(currentSequences.get(i), timestamp);
+ }
+ }
+
+ public SequenceValueExpression newSequenceReference(SequenceValueParseNode node) {
+ if (sequenceMap == null) {
+ sequenceMap = Maps.newHashMap();
+ isNextSequence = new BitSet();
+ }
+ PName tenantName = statement.getConnection().getTenantId();
+ String tenantId = tenantName == null ? null : tenantName.getString();
+ TableName tableName = node.getTableName();
+ SequenceKey key = new SequenceKey(tenantId, tableName.getSchemaName(), tableName.getTableName());
+ SequenceValueExpression expression = sequenceMap.get(key);
+ if (expression == null) {
+ int index = sequenceMap.size();
+ expression = new SequenceValueExpression(index);
+ sequenceMap.put(key, expression);
+ }
+ // If we see a NEXT and a CURRENT, treat the CURRENT just like a NEXT
+ if (node.getOp() == SequenceValueParseNode.Op.NEXT_VALUE) {
+ isNextSequence.set(expression.getIndex());
+ }
+
+ return expression;
+ }
+
+ public void initSequences() throws SQLException {
+ if (sequenceMap == null) {
+ return;
+ }
+ int maxSize = sequenceMap.size();
+ dstSequenceValues = new long[maxSize];
+ sequencePosition = new int[maxSize];
+ nextSequences = Lists.newArrayListWithExpectedSize(maxSize);
+ currentSequences = Lists.newArrayListWithExpectedSize(maxSize);
+ for (Map.Entry<SequenceKey, SequenceValueExpression> entry : sequenceMap.entrySet()) {
+ if (isNextSequence.get(entry.getValue().getIndex())) {
+ nextSequences.add(entry.getKey());
+ } else {
+ currentSequences.add(entry.getKey());
+ }
+ }
+ srcSequenceValues = new long[nextSequences.size()];
+ sqlExceptions = new SQLException[nextSequences.size()];
+ Collections.sort(nextSequences);
+ // Create reverse indexes
+ for (int i = 0; i < nextSequences.size(); i++) {
+ sequencePosition[i] = sequenceMap.get(nextSequences.get(i)).getIndex();
+ }
+ int offset = nextSequences.size();
+ for (int i = 0; i < currentSequences.size(); i++) {
+ sequencePosition[i+offset] = sequenceMap.get(currentSequences.get(i)).getIndex();
+ }
+ ConnectionQueryServices services = this.statement.getConnection().getQueryServices();
+ Long scn = statement.getConnection().getSCN();
+ long timestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
+ services.reserveSequenceValues(nextSequences, timestamp, srcSequenceValues, sqlExceptions);
+ setSequenceValues();
+ }
+
+ private class SequenceValueExpression extends BaseTerminalExpression {
+ private final int index;
+ private final byte[] valueBuffer = new byte[PDataType.LONG.getByteSize()];
+
+ private SequenceValueExpression(int index) {
+ this.index = index;
+ }
+
+ public int getIndex() {
+ return index;
+ }
+
+ @Override
+ public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+ PDataType.LONG.getCodec().encodeLong(dstSequenceValues[index], valueBuffer, 0);
+ ptr.set(valueBuffer);
+ return true;
+ }
+
+ @Override
+ public PDataType getDataType() {
+ return PDataType.LONG;
+ }
+
+ @Override
+ public boolean isNullable() {
+ return false;
+ }
+
+ @Override
+ public boolean isDeterministic() {
+ return false;
+ }
+
+ @Override
+ public boolean isStateless() {
+ return true;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
new file mode 100644
index 0000000..5736536
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -0,0 +1,227 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.SQLException;
+import java.text.Format;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.DateUtil;
+import org.apache.phoenix.util.NumberUtil;
+import org.apache.phoenix.util.ScanUtil;
+
+
+/**
+ *
+ * Class that keeps common state used across processing the various clauses in a
+ * top level JDBC statement such as SELECT, UPSERT, DELETE, etc.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class StatementContext {
+ private ColumnResolver resolver;
+ private final BindManager binds;
+ private final Scan scan;
+ private final ExpressionManager expressions;
+ private final AggregationManager aggregates;
+ private final String dateFormat;
+ private final Format dateFormatter;
+ private final Format dateParser;
+ private final String numberFormat;
+ private final ImmutableBytesWritable tempPtr;
+ private final PhoenixStatement statement;
+
+ private long currentTime = QueryConstants.UNSET_TIMESTAMP;
+ private ScanRanges scanRanges = ScanRanges.EVERYTHING;
+ private KeyRange minMaxRange = null;
+ private final SequenceManager sequences;
+
+ private TableRef currentTable;
+
+ public StatementContext(PhoenixStatement statement, ColumnResolver resolver, List<Object> binds, Scan scan) {
+ this.statement = statement;
+ this.resolver = resolver;
+ this.scan = scan;
+ this.binds = new BindManager(binds);
+ this.aggregates = new AggregationManager();
+ this.expressions = new ExpressionManager();
+ PhoenixConnection connection = statement.getConnection();
+ this.dateFormat = connection.getQueryServices().getProps().get(QueryServices.DATE_FORMAT_ATTRIB, DateUtil.DEFAULT_DATE_FORMAT);
+ this.dateFormatter = DateUtil.getDateFormatter(dateFormat);
+ this.dateParser = DateUtil.getDateParser(dateFormat);
+ this.numberFormat = connection.getQueryServices().getProps().get(QueryServices.NUMBER_FORMAT_ATTRIB, NumberUtil.DEFAULT_NUMBER_FORMAT);
+ this.tempPtr = new ImmutableBytesWritable();
+ this.currentTable = resolver != null && !resolver.getTables().isEmpty() ? resolver.getTables().get(0) : null;
+ this.sequences = new SequenceManager(statement);
+ }
+
+ public String getDateFormat() {
+ return dateFormat;
+ }
+
+ public Format getDateFormatter() {
+ return dateFormatter;
+ }
+
+ public Format getDateParser() {
+ return dateParser;
+ }
+
+ public String getNumberFormat() {
+ return numberFormat;
+ }
+
+ public Scan getScan() {
+ return scan;
+ }
+
+ public BindManager getBindManager() {
+ return binds;
+ }
+
+ public TableRef getCurrentTable() {
+ return currentTable;
+ }
+
+ public void setCurrentTable(TableRef table) {
+ this.currentTable = table;
+ }
+
+ public AggregationManager getAggregationManager() {
+ return aggregates;
+ }
+
+ public ColumnResolver getResolver() {
+ return resolver;
+ }
+
+ public void setResolver(ColumnResolver resolver) {
+ this.resolver = resolver;
+ }
+
+ public ExpressionManager getExpressionManager() {
+ return expressions;
+ }
+
+
+ public ImmutableBytesWritable getTempPtr() {
+ return tempPtr;
+ }
+
+ public ScanRanges getScanRanges() {
+ return this.scanRanges;
+ }
+
+ public void setScanRanges(ScanRanges scanRanges) {
+ setScanRanges(scanRanges, null);
+ }
+
+ public void setScanRanges(ScanRanges scanRanges, KeyRange minMaxRange) {
+ this.scanRanges = scanRanges;
+ this.scanRanges.setScanStartStopRow(scan);
+ PTable table = this.getCurrentTable().getTable();
+ if (minMaxRange != null) {
+ // Ensure minMaxRange is lower inclusive and upper exclusive, as that's
+ // what we need to intersect against for the HBase scan.
+ byte[] lowerRange = minMaxRange.getLowerRange();
+ if (!minMaxRange.lowerUnbound()) {
+ if (!minMaxRange.isLowerInclusive()) {
+ lowerRange = ScanUtil.nextKey(lowerRange, table, tempPtr);
+ }
+ }
+
+ byte[] upperRange = minMaxRange.getUpperRange();
+ if (!minMaxRange.upperUnbound()) {
+ if (minMaxRange.isUpperInclusive()) {
+ upperRange = ScanUtil.nextKey(upperRange, table, tempPtr);
+ }
+ }
+ if (minMaxRange.getLowerRange() != lowerRange || minMaxRange.getUpperRange() != upperRange) {
+ minMaxRange = KeyRange.getKeyRange(lowerRange, true, upperRange, false);
+ }
+ // If we're not salting, we can intersect this now with the scan range.
+ // Otherwise, we have to wait to do this when we chunk up the scan.
+ if (table.getBucketNum() == null) {
+ minMaxRange = minMaxRange.intersect(KeyRange.getKeyRange(scan.getStartRow(), scan.getStopRow()));
+ scan.setStartRow(minMaxRange.getLowerRange());
+ scan.setStopRow(minMaxRange.getUpperRange());
+ }
+ this.minMaxRange = minMaxRange;
+ }
+ }
+
+ public PhoenixConnection getConnection() {
+ return statement.getConnection();
+ }
+
+ public PhoenixStatement getStatement() {
+ return statement;
+ }
+
+ public long getCurrentTime() throws SQLException {
+ long ts = this.getCurrentTable().getTimeStamp();
+ if (ts != QueryConstants.UNSET_TIMESTAMP) {
+ return ts;
+ }
+ if (currentTime != QueryConstants.UNSET_TIMESTAMP) {
+ return currentTime;
+ }
+ /*
+ * For an UPSERT VALUES where autocommit off, we won't hit the server until the commit.
+ * However, if the statement has a CURRENT_DATE() call as a value, we need to know the
+ * current time at execution time. In that case, we'll call MetaDataClient.updateCache
+ * purely to bind the current time based on the server time.
+ */
+ PTable table = this.getCurrentTable().getTable();
+ PhoenixConnection connection = getConnection();
+ MetaDataClient client = new MetaDataClient(connection);
+ currentTime = client.getCurrentTime(table.getSchemaName().getString(), table.getTableName().getString());
+ return currentTime;
+ }
+
+ /**
+ * Get the key range derived from row value constructor usage in where clause. These are orthogonal to the ScanRanges
+ * and form a range for which each scan is intersected against.
+ */
+ public KeyRange getMinMaxRange () {
+ return minMaxRange;
+ }
+
+ public boolean isSingleRowScan() {
+ return this.getScanRanges().isSingleRowScan() && ! (this.getScan().getFilter() instanceof FilterList);
+ }
+
+ public SequenceManager getSequenceManager(){
+ return sequences;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
new file mode 100644
index 0000000..8eb769e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.phoenix.parse.BetweenParseNode;
+import org.apache.phoenix.parse.ColumnParseNode;
+import org.apache.phoenix.parse.ComparisonParseNode;
+import org.apache.phoenix.parse.LessThanOrEqualParseNode;
+import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.parse.ParseNodeRewriter;
+import org.apache.phoenix.parse.SelectStatement;
+import org.apache.phoenix.util.SchemaUtil;
+
+
+/**
+ *
+ * Class that creates a new select statement ensuring that a literal always occurs
+ * on the RHS (i.e. if literal found on the LHS, then the operator is reversed and
+ * the literal is put on the RHS)
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class StatementNormalizer extends ParseNodeRewriter {
+ private boolean useFullNameForAlias;
+
+ public StatementNormalizer(ColumnResolver resolver, int expectedAliasCount, boolean useFullNameForAlias) {
+ super(resolver, expectedAliasCount);
+ this.useFullNameForAlias = useFullNameForAlias;
+ }
+
+ public static ParseNode normalize(ParseNode where, ColumnResolver resolver) throws SQLException {
+ return rewrite(where, new StatementNormalizer(resolver, 0, false));
+ }
+
+ /**
+ * Rewrite the select statement by switching any constants to the right hand side
+ * of the expression.
+ * @param statement the select statement
+ * @param resolver
+ * @return new select statement
+ * @throws SQLException
+ */
+ public static SelectStatement normalize(SelectStatement statement, ColumnResolver resolver) throws SQLException {
+ return rewrite(statement, new StatementNormalizer(resolver, statement.getSelect().size(), statement.getFrom().size() > 1));
+ }
+
+ @Override
+ public ParseNode visitLeave(ComparisonParseNode node, List<ParseNode> nodes) throws SQLException {
+ if (nodes.get(0).isStateless() && !nodes.get(1).isStateless()) {
+ List<ParseNode> normNodes = Lists.newArrayListWithExpectedSize(2);
+ normNodes.add(nodes.get(1));
+ normNodes.add(nodes.get(0));
+ nodes = normNodes;
+ node = NODE_FACTORY.comparison(node.getInvertFilterOp(), nodes.get(0), nodes.get(1));
+ }
+ return super.visitLeave(node, nodes);
+ }
+
+ @Override
+ public ParseNode visitLeave(final BetweenParseNode node, List<ParseNode> nodes) throws SQLException {
+
+ LessThanOrEqualParseNode lhsNode = NODE_FACTORY.lte(node.getChildren().get(1), node.getChildren().get(0));
+ LessThanOrEqualParseNode rhsNode = NODE_FACTORY.lte(node.getChildren().get(0), node.getChildren().get(2));
+ List<ParseNode> parseNodes = Lists.newArrayListWithExpectedSize(2);
+ parseNodes.add(this.visitLeave(lhsNode, lhsNode.getChildren()));
+ parseNodes.add(this.visitLeave(rhsNode, rhsNode.getChildren()));
+ return super.visitLeave(node, parseNodes);
+ }
+
+ @Override
+ public ParseNode visit(ColumnParseNode node) throws SQLException {
+ if (useFullNameForAlias
+ && node.getAlias() != null
+ && node.getTableName() != null
+ && SchemaUtil.normalizeIdentifier(node.getAlias()).equals(node.getName())) {
+ node = NODE_FACTORY.column(NODE_FACTORY.table(node.getSchemaName(), node.getTableName()), node.isCaseSensitive() ? '"' + node.getName() + '"' : node.getName(), node.getFullName());
+ }
+ return super.visit(node);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/50d523f6/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementPlan.java
new file mode 100644
index 0000000..e41e86a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementPlan.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compile;
+
+import java.sql.ParameterMetaData;
+import java.sql.SQLException;
+
+import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
+
+
+public interface StatementPlan {
+ public final StatementPlan EMPTY_PLAN = new StatementPlan() {
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return new PhoenixParameterMetaData(0);
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ return ExplainPlan.EMPTY_PLAN;
+ }
+ };
+
+ /**
+ * Returns the ParameterMetaData for the statement
+ */
+ ParameterMetaData getParameterMetaData();
+
+ ExplainPlan getExplainPlan() throws SQLException;
+}