You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/09/14 21:43:46 UTC

[48/50] [abbrv] phoenix git commit: Fix compilation errors from sync with master

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
index 1b83be2,0000000..0915d09
mode 100644,000000..100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
@@@ -1,395 -1,0 +1,395 @@@
 +package org.apache.phoenix.calcite.rel;
 +
 +import static org.apache.phoenix.execute.MutationState.RowTimestampColInfo.NULL_ROWTIMESTAMP_INFO;
 +
 +import java.sql.ParameterMetaData;
 +import java.sql.ResultSet;
 +import java.sql.SQLException;
 +import java.util.Arrays;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +
 +import org.apache.calcite.plan.RelOptCluster;
 +import org.apache.calcite.plan.RelOptTable;
 +import org.apache.calcite.plan.RelTraitSet;
 +import org.apache.calcite.prepare.Prepare.CatalogReader;
 +import org.apache.calcite.rel.RelNode;
 +import org.apache.calcite.rel.core.TableModify;
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 +import org.apache.phoenix.calcite.PhoenixTable;
 +import org.apache.phoenix.compile.ColumnResolver;
 +import org.apache.phoenix.compile.ExplainPlan;
 +import org.apache.phoenix.compile.FromCompiler;
 +import org.apache.phoenix.compile.MutationPlan;
 +import org.apache.phoenix.compile.QueryPlan;
 +import org.apache.phoenix.compile.RowProjector;
 +import org.apache.phoenix.compile.SequenceManager;
 +import org.apache.phoenix.compile.StatementContext;
 +import org.apache.phoenix.compile.StatementPlan;
 +import org.apache.phoenix.exception.SQLExceptionCode;
 +import org.apache.phoenix.exception.SQLExceptionInfo;
 +import org.apache.phoenix.execute.MutationState;
 +import org.apache.phoenix.execute.MutationState.RowMutationState;
 +import org.apache.phoenix.execute.MutationState.RowTimestampColInfo;
 +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 +import org.apache.phoenix.iterate.ResultIterator;
 +import org.apache.phoenix.jdbc.PhoenixConnection;
 +import org.apache.phoenix.jdbc.PhoenixResultSet;
 +import org.apache.phoenix.jdbc.PhoenixStatement;
 +import org.apache.phoenix.query.ConnectionQueryServices;
 +import org.apache.phoenix.query.QueryServices;
 +import org.apache.phoenix.query.QueryServicesOptions;
 +import org.apache.phoenix.schema.IllegalDataException;
 +import org.apache.phoenix.schema.PColumn;
 +import org.apache.phoenix.schema.PName;
 +import org.apache.phoenix.schema.PRow;
 +import org.apache.phoenix.schema.PTable;
 +import org.apache.phoenix.schema.SortOrder;
 +import org.apache.phoenix.schema.TableRef;
 +import org.apache.phoenix.schema.types.PLong;
 +import org.apache.phoenix.util.ByteUtil;
 +import org.apache.phoenix.util.MetaDataUtil;
 +import org.apache.phoenix.util.ScanUtil;
 +import org.apache.phoenix.util.SchemaUtil;
 +
 +import com.google.common.collect.Lists;
 +import com.google.common.collect.Maps;
 +
 +public class PhoenixTableModify extends TableModify implements PhoenixRel {
 +
 +    public PhoenixTableModify(RelOptCluster cluster, RelTraitSet traits,
 +            RelOptTable table, CatalogReader catalogReader, RelNode child,
 +            Operation operation, List<String> updateColumnList, boolean flattened) {
 +        super(cluster, traits, table, catalogReader, child, operation, updateColumnList, flattened);
 +        assert operation == Operation.INSERT || operation == Operation.DELETE;
 +    }
 +
 +    @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
 +      return new PhoenixTableModify(
 +          getCluster(),
 +          traitSet,
 +          getTable(),
 +          getCatalogReader(),
 +          sole(inputs),
 +          getOperation(),
 +          getUpdateColumnList(),
 +          isFlattened());
 +    }
 +
 +    @Override
 +    public StatementPlan implement(PhoenixRelImplementor implementor) {
 +        final QueryPlan queryPlan = implementor.visitInput(0, (PhoenixQueryRel) input);
 +        final RowProjector projector = implementor.getTableMapping().createRowProjector();
 +
 +        final PhoenixTable targetTable = getTable().unwrap(PhoenixTable.class);
 +        final PhoenixConnection connection = targetTable.pc;
 +        final TableRef targetTableRef = targetTable.tableMapping.getTableRef();
 +        
 +        if (getOperation() == Operation.INSERT) {
 +            return upsert(connection, targetTable, targetTableRef, queryPlan, projector);
 +        }
 +        
 +        // delete
 +        return delete(connection, targetTable, targetTableRef, queryPlan, projector);
 +    }
 +    
 +    private static MutationPlan upsert(final PhoenixConnection connection,
 +            final PhoenixTable targetTable, final TableRef targetTableRef,
 +            final QueryPlan queryPlan, final RowProjector projector) {
 +        try (PhoenixStatement stmt = new PhoenixStatement(connection)) {
 +            final ColumnResolver resolver = FromCompiler.getResolver(targetTableRef);
 +            final StatementContext context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt));
 +
 +            // TODO TenantId, ViewIndexId, UpdatableViewColumns
 +            final List<PColumn> mappedColumns = targetTable.tableMapping.getMappedColumns();
 +            final int[] columnIndexes = new int[mappedColumns.size()];
 +            final int[] pkSlotIndexes = new int[mappedColumns.size()];
 +            for (int i = 0; i < columnIndexes.length; i++) {
 +                PColumn column = mappedColumns.get(i);
 +                if (SchemaUtil.isPKColumn(column)) {
 +                    pkSlotIndexes[i] = column.getPosition();
 +                }
 +                columnIndexes[i] = column.getPosition();
 +            }
 +            // TODO
 +            final boolean useServerTimestamp = false;
 +            
 +            return new MutationPlan() {
 +                @Override
 +                public ParameterMetaData getParameterMetaData() {
 +                    return queryPlan.getContext().getBindManager().getParameterMetaData();
 +                }
 +
 +                @Override
 +                public StatementContext getContext() {
 +                    return context;
 +                }
 +
 +                @Override
 +                public TableRef getTargetRef() {
 +                    return targetTableRef;
 +                }
 +
 +                @Override
 +                public Set<TableRef> getSourceRefs() {
 +                    // TODO return originalQueryPlan.getSourceRefs();
 +                    return queryPlan.getSourceRefs();
 +                }
 +
 +                @Override
 +                public org.apache.phoenix.jdbc.PhoenixStatement.Operation getOperation() {
 +                    return org.apache.phoenix.jdbc.PhoenixStatement.Operation.UPSERT;
 +                }
 +
 +                @Override
 +                public MutationState execute() throws SQLException {
 +                    ResultIterator iterator = queryPlan.iterator();
 +                    // simplest version, no run-on-server, no pipelined update
 +                    StatementContext childContext = queryPlan.getContext();
 +                    ConnectionQueryServices services = connection.getQueryServices();
 +                    int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,
 +                            QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
 +                    int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
 +                    boolean isAutoCommit = connection.getAutoCommit();
 +                    byte[][] values = new byte[columnIndexes.length][];
 +                    int rowCount = 0;
 +                    Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize);
 +                    PTable table = targetTableRef.getTable();
 +                    try (ResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) {
 +                        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
 +                        while (rs.next()) {
 +                            for (int i = 0; i < values.length; i++) {
 +                                PColumn column = table.getColumns().get(columnIndexes[i]);
 +                                byte[] bytes = rs.getBytes(i + 1);
 +                                ptr.set(bytes == null ? ByteUtil.EMPTY_BYTE_ARRAY : bytes);
 +                                Object value = rs.getObject(i + 1);
 +                                int rsPrecision = rs.getMetaData().getPrecision(i + 1);
 +                                Integer precision = rsPrecision == 0 ? null : rsPrecision;
 +                                int rsScale = rs.getMetaData().getScale(i + 1);
 +                                Integer scale = rsScale == 0 ? null : rsScale;
 +                                // We are guaranteed that the two column will have compatible types,
 +                                // as we checked that before.
 +                                if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(), precision, scale,
 +                                        column.getMaxLength(), column.getScale())) { throw new SQLExceptionInfo.Builder(
 +                                        SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString())
 +                                        .setMessage("value=" + column.getDataType().toStringLiteral(ptr, null)).build()
 +                                        .buildException(); }
 +                                column.getDataType().coerceBytes(ptr, value, column.getDataType(), 
 +                                        precision, scale, SortOrder.getDefault(), 
 +                                        column.getMaxLength(), column.getScale(), column.getSortOrder(),
 +                                        table.rowKeyOrderOptimizable());
 +                                values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr);
 +                            }
 +                            setValues(values, pkSlotIndexes, columnIndexes, table, mutation, connection, useServerTimestamp);
 +                            rowCount++;
 +                            // Commit a batch if auto commit is true and we're at our batch size
 +                            if (isAutoCommit && rowCount % batchSize == 0) {
 +                                MutationState state = new MutationState(targetTableRef, mutation, 0, maxSize, connection);
 +                                connection.getMutationState().join(state);
 +                                connection.getMutationState().send();
 +                                mutation.clear();
 +                            }
 +                        }
 +                        // If auto commit is true, this last batch will be committed upon return
 +                        return new MutationState(targetTableRef, mutation, rowCount / batchSize * batchSize, maxSize, connection);
 +                    }
 +                }
 +
 +                @Override
 +                public ExplainPlan getExplainPlan() throws SQLException {
 +                    List<String> queryPlanSteps =  queryPlan.getExplainPlan().getPlanSteps();
 +                    List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
 +                    planSteps.add("UPSERT SELECT");
 +                    planSteps.addAll(queryPlanSteps);
 +                    return new ExplainPlan(planSteps);
 +                }                
 +            };
 +        } catch (SQLException e) {
 +            throw new RuntimeException(e);
 +        }
 +    }
 +    
 +    private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,RowMutationState> mutation, PhoenixConnection connection, boolean useServerTimestamp) {
 +        Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length);
 +        byte[][] pkValues = new byte[table.getPKColumns().size()][];
 +        // If the table uses salting, the first byte is the salting byte, set to an empty array
 +        // here and we will fill in the byte later in PRowImpl.
 +        if (table.getBucketNum() != null) {
 +            pkValues[0] = new byte[] {0};
 +        }
 +        Long rowTimestamp = null; // case when the table doesn't have a row timestamp column
 +        RowTimestampColInfo rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp);
 +        for (int i = 0; i < values.length; i++) {
 +            byte[] value = values[i];
 +            PColumn column = table.getColumns().get(columnIndexes[i]);
 +            if (SchemaUtil.isPKColumn(column)) {
 +                pkValues[pkSlotIndex[i]] = value;
 +                if (SchemaUtil.getPKPosition(table, column) == table.getRowTimestampColPos()) {
 +                    if (!useServerTimestamp) {
 +                        PColumn rowTimestampCol = table.getPKColumns().get(table.getRowTimestampColPos());
 +                        rowTimestamp = PLong.INSTANCE.getCodec().decodeLong(value, 0, rowTimestampCol.getSortOrder());
 +                        if (rowTimestamp < 0) {
 +                            throw new IllegalDataException("Value of a column designated as ROW_TIMESTAMP cannot be less than zero");
 +                        }
 +                        rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp);
 +                    } 
 +                }
 +            } else {
 +                columnValues.put(column, value);
 +            }
 +        }
 +        ImmutableBytesPtr ptr = new ImmutableBytesPtr();
 +        table.newKey(ptr, pkValues);
 +        mutation.put(ptr, new RowMutationState(columnValues, connection.getStatementExecutionCounter(), rowTsColInfo));
 +    }
 +
 +    private static MutationPlan delete(final PhoenixConnection connection,
 +            final PhoenixTable targetTable, final TableRef targetTableRef,
 +            final QueryPlan queryPlan, final RowProjector projector) {
 +        final StatementContext context = queryPlan.getContext();
 +        // TODO
 +        final boolean deleteFromImmutableIndexToo = false;
 +        return new MutationPlan() {
 +            @Override
 +            public ParameterMetaData getParameterMetaData() {
 +                return context.getBindManager().getParameterMetaData();
 +            }
 +
 +            @Override
 +            public StatementContext getContext() {
 +                return context;
 +            }
 +
 +            @Override
 +            public TableRef getTargetRef() {
 +                return targetTableRef;
 +            }
 +
 +            @Override
 +            public Set<TableRef> getSourceRefs() {
 +                // TODO dataPlan.getSourceRefs();
 +                return queryPlan.getSourceRefs();
 +            }
 +
 +            @Override
 +            public org.apache.phoenix.jdbc.PhoenixStatement.Operation getOperation() {
 +                return org.apache.phoenix.jdbc.PhoenixStatement.Operation.DELETE;
 +            }
 +
 +            @Override
 +            public MutationState execute() throws SQLException {
 +                ResultIterator iterator = queryPlan.iterator();
 +                try {
 +                    // TODO hasLimit??
 +                    return deleteRows(context, targetTableRef, deleteFromImmutableIndexToo ? queryPlan.getTableRef() : null, iterator, projector, queryPlan.getTableRef());
 +                } finally {
 +                    iterator.close();
 +                }
 +            }
 +
 +            @Override
 +            public ExplainPlan getExplainPlan() throws SQLException {
 +                List<String> queryPlanSteps =  queryPlan.getExplainPlan().getPlanSteps();
 +                List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
 +                planSteps.add("DELETE ROWS");
 +                planSteps.addAll(queryPlanSteps);
 +                return new ExplainPlan(planSteps);
 +            }
 +        };
 +    }
 +    
 +    private static MutationState deleteRows(StatementContext childContext, TableRef targetTableRef, TableRef indexTableRef, ResultIterator iterator, RowProjector projector, TableRef sourceTableRef) throws SQLException {
 +        PTable table = targetTableRef.getTable();
 +        PhoenixStatement statement = childContext.getStatement();
 +        PhoenixConnection connection = statement.getConnection();
 +        PName tenantId = connection.getTenantId();
 +        byte[] tenantIdBytes = null;
 +        if (tenantId != null) {
-             tenantIdBytes = ScanUtil.getTenantIdBytes(table.getRowKeySchema(), table.getBucketNum() != null, tenantId);
++            tenantIdBytes = ScanUtil.getTenantIdBytes(table.getRowKeySchema(), table.getBucketNum() != null, tenantId, table.getViewIndexId() != null);
 +        }
 +        final boolean isAutoCommit = connection.getAutoCommit();
 +        ConnectionQueryServices services = connection.getQueryServices();
 +        final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
 +        final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
 +        Map<ImmutableBytesPtr,RowMutationState> mutations = Maps.newHashMapWithExpectedSize(batchSize);
 +        Map<ImmutableBytesPtr,RowMutationState> indexMutations = null;
 +        // If indexTableRef is set, we're deleting the rows from both the index table and
 +        // the data table through a single query to save executing an additional one.
 +        if (indexTableRef != null) {
 +            indexMutations = Maps.newHashMapWithExpectedSize(batchSize);
 +        }
 +        List<PColumn> pkColumns = table.getPKColumns();
 +        boolean isMultiTenant = table.isMultiTenant() && tenantIdBytes != null;
 +        boolean isSharedViewIndex = table.getViewIndexId() != null;
 +        int offset = (table.getBucketNum() == null ? 0 : 1);
 +        byte[][] values = new byte[pkColumns.size()][];
 +        if (isMultiTenant) {
 +            values[offset++] = tenantIdBytes;
 +        }
 +        if (isSharedViewIndex) {
 +            values[offset++] = MetaDataUtil.getViewIndexIdDataType().toBytes(table.getViewIndexId());
 +        }
 +        try (PhoenixResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) {
 +            int rowCount = 0;
 +            while (rs.next()) {
 +                ImmutableBytesPtr ptr = new ImmutableBytesPtr();  // allocate new as this is a key in a Map
 +                // Use tuple directly, as projector would not have all the PK columns from
 +                // our index table inside of our projection. Since the tables are equal,
 +                // there's no transation required.
 +                if (sourceTableRef.equals(targetTableRef)) {
 +                    rs.getCurrentRow().getKey(ptr);
 +                } else {
 +                    for (int i = offset; i < values.length; i++) {
 +                        byte[] byteValue = rs.getBytes(i+1-offset);
 +                        // The ResultSet.getBytes() call will have inverted it - we need to invert it back.
 +                        // TODO: consider going under the hood and just getting the bytes
 +                        if (pkColumns.get(i).getSortOrder() == SortOrder.DESC) {
 +                            byte[] tempByteValue = Arrays.copyOf(byteValue, byteValue.length);
 +                            byteValue = SortOrder.invert(byteValue, 0, tempByteValue, 0, byteValue.length);
 +                        }
 +                        values[i] = byteValue;
 +                    }
 +                    table.newKey(ptr, values);
 +                }
 +                // When issuing deletes, we do not care about the row time ranges. Also, if the table had a row timestamp column, then the
 +                // row key will already have its value. 
 +                mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO));
 +                if (indexTableRef != null) {
 +                    ImmutableBytesPtr indexPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map
 +                    rs.getCurrentRow().getKey(indexPtr);
 +                    indexMutations.put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO));
 +                }
 +                if (mutations.size() > maxSize) {
 +                    throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize);
 +                }
 +                rowCount++;
 +                // Commit a batch if auto commit is true and we're at our batch size
 +                if (isAutoCommit && rowCount % batchSize == 0) {
 +                    MutationState state = new MutationState(targetTableRef, mutations, 0, maxSize, connection);
 +                    connection.getMutationState().join(state);
 +                    if (indexTableRef != null) {
 +                        MutationState indexState = new MutationState(indexTableRef, indexMutations, 0, maxSize, connection);
 +                        connection.getMutationState().join(indexState);
 +                    }
 +                    connection.getMutationState().send();
 +                    mutations.clear();
 +                    if (indexMutations != null) {
 +                        indexMutations.clear();
 +                    }
 +                }
 +            }
 +
 +            // If auto commit is true, this last batch will be committed upon return
 +            int nCommittedRows = rowCount / batchSize * batchSize;
 +            MutationState state = new MutationState(targetTableRef, mutations, nCommittedRows, maxSize, connection);
 +            if (indexTableRef != null) {
 +                // To prevent the counting of these index rows, we have a negative for remainingRows.
 +                MutationState indexState = new MutationState(indexTableRef, indexMutations, 0, maxSize, connection);
 +                state.join(indexState);
 +            }
 +            return state;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
index a72793a,0000000..5bc2c0d
mode 100644,000000..100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
@@@ -1,138 -1,0 +1,143 @@@
 +package org.apache.phoenix.calcite.rel;
 +
 +import java.sql.SQLException;
 +import java.util.List;
 +
 +import org.apache.calcite.adapter.enumerable.EnumerableConvention;
 +import org.apache.calcite.adapter.enumerable.EnumerableRel;
 +import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
 +import org.apache.calcite.adapter.enumerable.JavaRowFormat;
 +import org.apache.calcite.adapter.enumerable.PhysType;
 +import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
 +import org.apache.calcite.linq4j.tree.BlockBuilder;
 +import org.apache.calcite.linq4j.tree.Expression;
 +import org.apache.calcite.linq4j.tree.Expressions;
 +import org.apache.calcite.linq4j.tree.MethodCallExpression;
 +import org.apache.calcite.linq4j.tree.ParameterExpression;
 +import org.apache.calcite.plan.ConventionTraitDef;
 +import org.apache.calcite.plan.RelOptCluster;
 +import org.apache.calcite.plan.RelOptCost;
 +import org.apache.calcite.plan.RelOptPlanner;
 +import org.apache.calcite.plan.RelTraitSet;
 +import org.apache.calcite.rel.RelNode;
 +import org.apache.calcite.rel.convert.ConverterImpl;
 +import org.apache.calcite.rel.metadata.RelMetadataQuery;
 +import org.apache.calcite.rel.type.RelDataType;
 +import org.apache.calcite.util.ImmutableIntList;
++import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.phoenix.calcite.BuiltInMethod;
 +import org.apache.phoenix.calcite.rel.PhoenixRelImplementor.ImplementorContext;
 +import org.apache.phoenix.compile.ExplainPlan;
 +import org.apache.phoenix.compile.QueryPlan;
 +import org.apache.phoenix.compile.RowProjector;
 +import org.apache.phoenix.compile.StatementPlan;
 +import org.apache.phoenix.execute.DelegateQueryPlan;
 +import org.apache.phoenix.execute.RuntimeContext;
 +import org.apache.phoenix.execute.RuntimeContextImpl;
 +import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
 +import org.apache.phoenix.iterate.ParallelScanGrouper;
 +import org.apache.phoenix.iterate.ResultIterator;
 +
 +/**
 + * Scan of a Phoenix table.
 + */
 +public class PhoenixToEnumerableConverter extends ConverterImpl implements EnumerableRel {
 +
 +    public static PhoenixToEnumerableConverter create(RelNode input) {
 +        RelOptCluster cluster = input.getCluster();
 +        RelTraitSet traits = input.getTraitSet().replace(EnumerableConvention.INSTANCE);
 +        return new PhoenixToEnumerableConverter(cluster, traits, input);
 +    }
 +
 +    private PhoenixToEnumerableConverter(
 +        RelOptCluster cluster,
 +        RelTraitSet traits,
 +        RelNode input) {
 +        super(cluster, ConventionTraitDef.INSTANCE, traits, input);
 +    }
 +
 +    @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
 +        return create(sole(inputs));
 +    }
 +
 +    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
 +        return super.computeSelfCost(planner, mq)
 +                .multiplyBy(.1)
 +                .multiplyBy(PhoenixRel.PHOENIX_FACTOR)
 +                .multiplyBy(PhoenixRel.SERVER_FACTOR);
 +    }
 +
 +    @Override
 +    public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
 +        // Generates code that instantiates a result iterator, then converts it
 +        // to an enumerable.
 +        //
 +        //   ResultIterator iterator = root.get("x");
 +        //   return CalciteRuntime.toEnumerable(iterator);
 +        final BlockBuilder list = new BlockBuilder();
 +        StatementPlan plan = makePlan((PhoenixRel)getInput());
 +        Expression var = stash(implementor, plan, StatementPlan.class);
 +        final RelDataType rowType = getRowType();
 +        final PhysType physType =
 +            PhysTypeImpl.of(
 +                implementor.getTypeFactory(), rowType,
 +                pref.prefer(JavaRowFormat.ARRAY));
 +        final Expression iterator_ =
 +            list.append("iterator", var);
 +        final Expression enumerable_ =
 +            list.append("enumerable",
 +                Expressions.call(BuiltInMethod.TO_ENUMERABLE.method,
 +                    iterator_));
 +        list.add(Expressions.return_(null, enumerable_));
 +        return implementor.result(physType, list.toBlock());
 +    }
 +    
 +    static StatementPlan makePlan(PhoenixRel rel) {
 +        RuntimeContext runtimeContext = new RuntimeContextImpl();
 +        RuntimeContext.THREAD_LOCAL.get().add(runtimeContext);
 +        final PhoenixRelImplementor phoenixImplementor = new PhoenixRelImplementorImpl(runtimeContext);
 +        phoenixImplementor.pushContext(new ImplementorContext(true, false, ImmutableIntList.identity(rel.getRowType().getFieldCount())));
 +        final StatementPlan plan = rel.implement(phoenixImplementor);
 +        if (!(plan instanceof QueryPlan)) {
 +            return plan;
 +        }
 +            
 +        return new DelegateQueryPlan((QueryPlan) plan) {
 +            @Override
 +            public ResultIterator iterator() throws SQLException {
 +                return iterator(DefaultParallelScanGrouper.getInstance());
 +            }
 +            @Override
 +            public ExplainPlan getExplainPlan() throws SQLException {
 +                return delegate.getExplainPlan();
 +            }
 +            @Override
 +            public RowProjector getProjector() {
 +                return phoenixImplementor.getTableMapping().createRowProjector();
 +            }
 +            @Override
 +            public ResultIterator iterator(ParallelScanGrouper scanGrouper)
 +                    throws SQLException {
 +                return delegate.iterator(scanGrouper);
 +            }
 +            @Override
 +            public QueryPlan limit(Integer limit) {
 +                return delegate.limit(limit);
 +            }
++            @Override
++            public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
++                return delegate.iterator(scanGrouper, scan);
++            }
 +        };
 +    }
 +
 +    @SuppressWarnings({ "rawtypes", "unchecked" })
 +    static Expression stash(EnumerableRelImplementor implementor, Object o, Class clazz) {
 +        ParameterExpression x = (ParameterExpression) implementor.stash(o, clazz);
 +        MethodCallExpression e =
 +            Expressions.call(implementor.getRootExpression(),
 +                org.apache.calcite.util.BuiltInMethod.DATA_CONTEXT_GET.method,
 +                Expressions.constant(x.name));
 +        return Expressions.convert_(e, clazz);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
index 6909b23,4d3c0cf..bbc995c
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
@@@ -175,12 -177,12 +177,12 @@@ public class TupleProjectionCompiler 
              projectedColumns.add(column);
          }
          return PTableImpl.makePTable(table.getTenantId(), PROJECTED_TABLE_SCHEMA, table.getName(), PTableType.PROJECTED,
 -                null, table.getTimeStamp(), table.getSequenceNumber(), table.getPKName(),
 -                retainPKColumns ? table.getBucketNum() : null, projectedColumns, null, null,
 -                Collections.<PTable> emptyList(), table.isImmutableRows(), Collections.<PName> emptyList(), null, null,
 -                table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(),
 -                table.getViewIndexId(), null, table.rowKeyOrderOptimizable(), table.isTransactional(),
 -                table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
 +                    null, table.getTimeStamp(), table.getSequenceNumber(), table.getPKName(),
 +                    retainPKColumns ? table.getBucketNum() : null, projectedColumns, null,
 +                    null, Collections.<PTable>emptyList(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null,
 +                    table.isWALDisabled(), retainPKColumns ? table.isMultiTenant() : false, table.getStoreNulls(), table.getViewType(),
 +                    retainPKColumns ? table.getViewIndexId() : null, null, table.rowKeyOrderOptimizable(), table.isTransactional(),
-                     table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp());
++                    table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
      }
  
      // For extracting column references from single select statement

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 5ab8e3a,00d478a..cd83c4d
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@@ -77,10 -80,9 +80,12 @@@ public class AggregatePlan extends Base
      private final Expression having;
      private List<KeyRange> splits;
      private List<List<Scan>> scans;
-     
+     private static final Logger logger = LoggerFactory.getLogger(AggregatePlan.class);
+     private boolean isSerial;
 -    
++        
 +    public static AggregatePlan create(AggregatePlan plan, OrderBy newOrderBy) {
 +        return new AggregatePlan(plan.getContext(), plan.getStatement(), plan.getTableRef(), plan.getSourceRefs().iterator().next(), plan.getProjector(), null, null, newOrderBy, plan.parallelIteratorFactory, plan.getGroupBy(), plan.getHaving(), plan.dynamicFilter);
 +    }
  
      public AggregatePlan(StatementContext context, FilterableStatement statement, TableRef table,
              RowProjector projector, Integer limit, Integer offset, OrderBy orderBy,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
index 56930ba,fc5a04d..9e6dad3
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
@@@ -21,9 -21,9 +21,10 @@@ import java.io.IOException
  import java.sql.SQLException;
  import java.util.List;
  
+ import org.apache.hadoop.hbase.client.Scan;
  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  import org.apache.phoenix.compile.ExplainPlan;
 +import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
  import org.apache.phoenix.compile.QueryPlan;
  import org.apache.phoenix.exception.SQLExceptionCode;
  import org.apache.phoenix.exception.SQLExceptionInfo;
@@@ -105,10 -104,14 +106,15 @@@ public class CorrelatePlan extends Dele
      }
  
      @Override
-     public ResultIterator iterator(ParallelScanGrouper scanGrouper)
+     public ResultIterator iterator(ParallelScanGrouper scanGrouper) 
+                 throws SQLException {
+         return iterator(scanGrouper, null);
+     }
+     @Override
+     public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan)
              throws SQLException {
          return new ResultIterator() {
 +            private final CorrelateVariable variable = runtimeContext.getCorrelateVariable(variableId);
              private final ValueBitSet destBitSet = ValueBitSet.newInstance(joinedSchema);
              private final ValueBitSet lhsBitSet = ValueBitSet.newInstance(lhsSchema);
              private final ValueBitSet rhsBitSet = 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
index 8f4711c,0000000..31ad0e9
mode 100644,000000..100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
@@@ -1,70 -1,0 +1,70 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.phoenix.execute;
 +
 +import java.sql.SQLException;
 +import java.util.Collections;
 +import java.util.List;
 +
 +import org.apache.hadoop.hbase.client.Scan;
 +import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 +import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 +import org.apache.phoenix.compile.QueryPlan;
 +import org.apache.phoenix.compile.RowProjector;
 +import org.apache.phoenix.compile.ScanRanges;
 +import org.apache.phoenix.compile.StatementContext;
 +import org.apache.phoenix.iterate.ParallelScanGrouper;
 +import org.apache.phoenix.iterate.ResultIterator;
 +import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
 +import org.apache.phoenix.parse.FilterableStatement;
 +import org.apache.phoenix.query.KeyRange;
 +import org.apache.phoenix.schema.TableRef;
 +
 +public class DegenerateQueryPlan extends BaseQueryPlan {
 +
 +    public DegenerateQueryPlan(StatementContext context, FilterableStatement statement, TableRef table) {
 +        super(context, statement, table, table, RowProjector.EMPTY_PROJECTOR, PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA, null,null, OrderBy.EMPTY_ORDER_BY, GroupBy.EMPTY_GROUP_BY, null, null);
 +        context.setScanRanges(ScanRanges.NOTHING);
 +    }
 +
 +    @Override
 +    public List<KeyRange> getSplits() {
 +        return Collections.emptyList();
 +    }
 +
 +    @Override
 +    public List<List<Scan>> getScans() {
 +        return Collections.emptyList();
 +    }
 +
 +    @Override
-     protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) throws SQLException {
++    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
 +        return null;
 +    }
 +
 +    @Override
 +    public boolean useRoundRobinIterator() {
 +        return false;
 +    }
 +
 +    @Override
 +    public QueryPlan limit(Integer limit) {
 +        return this;
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index b085f08,7f735b7..d3d000b
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@@ -81,17 -80,15 +81,19 @@@ public class ScanPlan extends BaseQuery
      private List<KeyRange> splits;
      private List<List<Scan>> scans;
      private boolean allowPageFilter;
+     private boolean isSerial;
+     private boolean isDataToScanWithinThreshold;
 +
 +    public static ScanPlan create(ScanPlan plan, OrderBy newOrderBy) throws SQLException {
 +        return new ScanPlan(plan.getContext(), plan.getStatement(), plan.getTableRef(), plan.getSourceRefs().iterator().next(), plan.getProjector(), null, null, newOrderBy, plan.parallelIteratorFactory, plan.allowPageFilter, plan.dynamicFilter);
 +    }
      
      public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, Integer offset, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) throws SQLException {
 -        this(context, statement, table, projector, limit, offset, orderBy, parallelIteratorFactory, allowPageFilter, null);
 +        this(context, statement, table, table, projector, limit, offset, orderBy, parallelIteratorFactory, allowPageFilter, null);
      }
      
 -    private ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, Integer offset, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, Expression dynamicFilter) throws SQLException {
 -        super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit,offset, orderBy, GroupBy.EMPTY_GROUP_BY,
 +    public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, TableRef srcRef, RowProjector projector, Integer limit, Integer offset, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, Expression dynamicFilter) throws SQLException {
 +        super(context, statement, table, srcRef, projector, context.getBindManager().getParameterMetaData(), limit,offset, orderBy, GroupBy.EMPTY_GROUP_BY,
                  parallelIteratorFactory != null ? parallelIteratorFactory :
                          buildResultIteratorFactory(context, statement, table, orderBy, limit, offset, allowPageFilter), dynamicFilter);
          this.allowPageFilter = allowPageFilter;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
index 12db6d9,94c59df..137a632
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
@@@ -20,9 -20,9 +20,10 @@@ package org.apache.phoenix.execute
  import java.sql.SQLException;
  import java.util.List;
  
+ import org.apache.hadoop.hbase.client.Scan;
  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
  import org.apache.phoenix.compile.ExplainPlan;
 +import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
  import org.apache.phoenix.compile.QueryPlan;
  import org.apache.phoenix.expression.BaseSingleExpression;
  import org.apache.phoenix.expression.BaseTerminalExpression;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index e26660e,8b9adfd..20bda5e
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@@ -114,11 -119,12 +117,11 @@@ public abstract class BaseResultIterato
  
      private final List<List<Scan>> scans;
      private final List<KeyRange> splits;
 -    private final PTableStats tableStats;
      private final byte[] physicalTableName;
-     private final QueryPlan plan;
+     protected final QueryPlan plan;
      protected final String scanId;
      protected final MutationState mutationState;
-     private final ParallelScanGrouper scanGrouper;
+     protected final ParallelScanGrouper scanGrouper;
      // TODO: too much nesting here - breakup into new classes.
      private final List<List<List<Pair<Scan,Future<PeekingResultIterator>>>>> allFutures;
      private Long estimatedRows;
@@@ -136,17 -143,14 +140,20 @@@
          return plan.getTableRef().getTable();
      }
      
+     protected boolean useStats() {
++        return useStats(this.context);
++    }
++    
 +    private static boolean useStats(StatementContext context) {
 +        Scan scan = context.getScan();
 +        boolean isPointLookup = context.getScanRanges().isPointLookup();
          /*
-          *  Don't use guide posts if:
-          *  1) We're doing a point lookup, as HBase is fast enough at those
-          *     to not need them to be further parallelized. TODO: perf test to verify
-          *  2) We're collecting stats, as in this case we need to scan entire
-          *     regions worth of data to track where to put the guide posts.
+          * Don't use guide posts:
+          * 1) If we're collecting stats, as in this case we need to scan entire
+          * regions worth of data to track where to put the guide posts.
+          * 2) If the query is going to be executed serially.
           */
-         if (isPointLookup || ScanUtil.isAnalyzeTable(scan)) {
+         if (ScanUtil.isAnalyzeTable(scan)) {
              return false;
          }
          return true;
@@@ -342,10 -358,17 +361,10 @@@
          TableRef tableRef = plan.getTableRef();
          PTable table = tableRef.getTable();
          physicalTableName = table.getPhysicalName().getBytes();
 -        Long currentSCN = context.getConnection().getSCN();
 -        if (null == currentSCN) {
 -          currentSCN = HConstants.LATEST_TIMESTAMP;
 -        }
 -        tableStats = useStats() && StatisticsUtil.isStatsEnabled(TableName.valueOf(physicalTableName))
 -                ? context.getConnection().getQueryServices().getTableStats(physicalTableName, currentSCN)
 -                : PTableStats.EMPTY_STATS;
          // Used to tie all the scans together during logging
-         scanId = UUID.randomUUID().toString();
+         scanId = new UUID(ThreadLocalRandom.current().nextLong(), ThreadLocalRandom.current().nextLong()).toString();
          
-         initializeScan(plan, perScanLimit, offset);
+         initializeScan(plan, perScanLimit, offset, scan);
          
          this.scans = getParallelScans();
          List<KeyRange> splitRanges = Lists.newArrayListWithExpectedSize(scans.size() * ESTIMATED_GUIDEPOSTS_PER_REGION);
@@@ -401,18 -424,13 +420,13 @@@
          return guideIndex;
      }
  
-     private static GuidePostsInfo getGuidePosts(StatementContext context, PTable table, Set<byte[]> whereConditions) throws SQLException {
-         /*
-          * Don't use guide posts if: 1) We're doing a point lookup, as HBase is fast enough at those to not need them to
-          * be further parallelized. TODO: pref test to verify 2) We're collecting stats, as in this case we need to scan
-          * entire regions worth of data to track where to put the guide posts.
-          */
-         if (!useStats(context)) { return GuidePostsInfo.NO_GUIDEPOST; }
 -    private GuidePostsInfo getGuidePosts(Set<byte[]> whereConditions) {
 -        if (!useStats()) { return GuidePostsInfo.NO_GUIDEPOST; }
++    private static GuidePostsInfo getGuidePosts(StatementContext context, PTable table, Set<byte[]> whereConditions, boolean useStats) throws SQLException {
++        if (!useStats) { return GuidePostsInfo.NO_GUIDEPOST; }
  
          GuidePostsInfo gps = null;
 -        PTable table = getTable();
 +        PTableStats tableStats = new MetaDataClient(context.getConnection()).getTableStats(table);
          Map<byte[], GuidePostsInfo> guidePostMap = tableStats.getGuidePosts();
 -        byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(getTable());
 +        byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(table);
          if (table.getColumnFamilies().isEmpty()) {
              // For sure we can get the defaultCF from the table
              gps = getDefaultFamilyGuidePosts(guidePostMap, defaultCF);
@@@ -495,7 -571,7 +567,7 @@@
                  whereConditions.add(cf);
              }
          }
-         GuidePostsInfo gps = getGuidePosts(context, table, whereConditions);
 -        GuidePostsInfo gps = getGuidePosts(whereConditions);
++        GuidePostsInfo gps = getGuidePosts(context, table, whereConditions, useStats());
          hasGuidePosts = gps != GuidePostsInfo.NO_GUIDEPOST;
          boolean traverseAllRegions = isSalted || isLocalIndex;
          if (!traverseAllRegions) {
@@@ -612,130 -686,6 +682,130 @@@
          return parallelScans;
      }
  
 +
 +    /**
 +     * Compute the estimated count of rows and bytes that will be scanned.
 +     * @return the estimated row count and the byte count.
 +     * @throws SQLException
 +     */
 +    public static Pair<Long, Long> getEstimatedCount(StatementContext context, PTable table) throws SQLException {
 +        if (table.getName() == null) { // empty table
 +            return new Pair<Long, Long>(null, null);
 +        }
 +        
 +        if (context.getScanRanges().isPointLookup()) {
 +            return new Pair<Long, Long>(1L, SchemaUtil.estimateRowSize(table));
 +        }
 +        
 +        TreeSet<byte[]> whereConditions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
 +        for(Pair<byte[], byte[]> where : context.getWhereConditionColumns()) {
 +            byte[] cf = where.getFirst();
 +            if (cf != null) {
 +                whereConditions.add(cf);
 +            }
 +        }
-         GuidePostsInfo gps = getGuidePosts(context, table, whereConditions);
++        GuidePostsInfo gps = getGuidePosts(context, table, whereConditions, useStats(context));
 +        if (gps == GuidePostsInfo.NO_GUIDEPOST) {
 +            return new Pair<Long, Long>(null, null);
 +        }
 +        
 +        byte[] startKey = ByteUtil.EMPTY_BYTE_ARRAY;
 +        byte[] stopKey = ByteUtil.EMPTY_BYTE_ARRAY;
 +        Scan scan = context.getScan();
 +        List<HRegionLocation> regionLocations = context.getConnection().getQueryServices()
 +                .getAllTableRegions(table.getPhysicalName().getBytes());
 +        List<byte[]> regionBoundaries = toBoundaries(regionLocations);
 +        ScanRanges scanRanges = context.getScanRanges();
 +        boolean isSalted = table.getBucketNum() != null;
 +        boolean isLocalIndex = table.getIndexType() == IndexType.LOCAL;
 +        boolean traverseAllRegions = isSalted || isLocalIndex;
 +        if (!traverseAllRegions) {
 +            byte[] scanStartRow = scan.getStartRow();
 +            if (scanStartRow.length != 0 && Bytes.compareTo(scanStartRow, startKey) > 0) {
 +                startKey = scanStartRow;
 +            }
 +            byte[] scanStopRow = scan.getStopRow();
 +            if (stopKey.length == 0
 +                    || (scanStopRow.length != 0 && Bytes.compareTo(scanStopRow, stopKey) < 0)) {
 +                stopKey = scanStopRow;
 +            }
 +        }
 +        
 +        int regionIndex = 0;
 +        int stopIndex = regionBoundaries.size();
 +        if (startKey.length > 0) {
 +            regionIndex = getIndexContainingInclusive(regionBoundaries, startKey);
 +        }
 +        if (stopKey.length > 0) {
 +            stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), stopKey));
 +            if (isLocalIndex) {
 +                stopKey = regionLocations.get(stopIndex).getRegionInfo().getEndKey();
 +            }
 +        }
 +        
 +        ImmutableBytesWritable currentKey = new ImmutableBytesWritable(startKey);
 +        
 +        int gpsSize = gps.getGuidePostsCount();
 +        int keyOffset = 0;
 +        ImmutableBytesWritable currentGuidePost = ByteUtil.EMPTY_IMMUTABLE_BYTE_ARRAY;
 +        ImmutableBytesWritable guidePosts = gps.getGuidePosts();
 +        ByteArrayInputStream stream = null;
 +        DataInput input = null;
 +        PrefixByteDecoder decoder = null;
 +        int guideIndex = 0;
 +        long estimatedRows = 0;
 +        long estimatedSize = 0;
 +        try {
 +            if (gpsSize > 0) {
 +                stream = new ByteArrayInputStream(guidePosts.get(), guidePosts.getOffset(), guidePosts.getLength());
 +                input = new DataInputStream(stream);
 +                decoder = new PrefixByteDecoder(gps.getMaxLength());
 +                try {
 +                    while (currentKey.compareTo(currentGuidePost = PrefixByteCodec.decode(decoder, input)) >= 0
 +                            && currentKey.getLength() != 0) {
 +                        guideIndex++;
 +                    }
 +                } catch (EOFException e) {}
 +            }
 +            byte[] currentKeyBytes = currentKey.copyBytes();
 +    
 +            // Merge bisect with guideposts for all but the last region
 +            while (regionIndex <= stopIndex) {
 +                byte[] currentGuidePostBytes = currentGuidePost.copyBytes();
 +                byte[] endKey, endRegionKey = EMPTY_BYTE_ARRAY;
 +                if (regionIndex == stopIndex) {
 +                    endKey = stopKey;
 +                } else {
 +                    endKey = regionBoundaries.get(regionIndex);
 +                }
 +                HRegionLocation regionLocation = regionLocations.get(regionIndex);
 +                if (isLocalIndex) {
 +                    HRegionInfo regionInfo = regionLocation.getRegionInfo();
 +                    endRegionKey = regionInfo.getEndKey();
 +                    keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey);
 +                }
 +                try {
 +                    while (guideIndex < gpsSize && (currentGuidePost.compareTo(endKey) <= 0 || endKey.length == 0)) {
 +                        Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, currentGuidePostBytes, keyOffset,
 +                                false);
 +                        if (newScan != null) {
 +                            estimatedRows += gps.getRowCounts().get(guideIndex);
 +                            estimatedSize += gps.getByteCounts().get(guideIndex);
 +                        }
 +                        currentKeyBytes = currentGuidePost.copyBytes();
 +                        currentGuidePost = PrefixByteCodec.decode(decoder, input);
 +                        currentGuidePostBytes = currentGuidePost.copyBytes();
 +                        guideIndex++;
 +                    }
 +                } catch (EOFException e) {}
 +                currentKeyBytes = endKey;
 +                regionIndex++;
 +            }
 +        } finally {
 +            if (stream != null) Closeables.closeQuietly(stream);
 +        }
 +        return new Pair<Long, Long>(estimatedRows, estimatedSize);
 +    }
     
      public static <T> List<T> reverseIfNecessary(List<T> list, boolean reverse) {
          if (!reverse) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/parse/ColumnDef.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 755e509,daef367..cb0bceb
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@@ -1484,9 -1593,9 +1594,9 @@@ public abstract class BaseTest 
          if (ts != null) {
              props.setProperty(CURRENT_SCN_ATTRIB, ts.toString());
          }
-         Connection conn = DriverManager.getConnection(getUrl(), props);
+         Connection conn = DriverManager.getConnection(url, props);
          try {
 -            conn.createStatement().execute("CREATE SEQUENCE my.seq");
 +            conn.createStatement().execute("CREATE SEQUENCE IF NOT EXISTS my.seq");
              // Insert into customer table
              PreparedStatement stmt = conn.prepareStatement(
                      "upsert into " + JOIN_CUSTOMER_TABLE_FULL_NAME +

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index 75ff40d,05fbf81..ecf31ab
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@@ -455,13 -471,8 +471,13 @@@ public class ParallelIteratorsSplitTes
              public boolean useRoundRobinIterator() {
                  return false;
              }
 +
 +            @Override
 +            public QueryPlan limit(Integer limit) {
 +                return this;
 +            }
              
-         }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()));
+         }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan(), false);
          List<KeyRange> keyRanges = parallelIterators.getSplits();
          return keyRanges;
      }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --cc phoenix-core/src/test/resources/log4j.properties
index 96ebd73,85706b4..575ee67
--- a/phoenix-core/src/test/resources/log4j.properties
+++ b/phoenix-core/src/test/resources/log4j.properties
@@@ -61,4 -61,5 +61,6 @@@ log4j.logger.org.mortbay.log=WAR
  log4j.logger.org.apache.hadoop=WARN
  log4j.logger.org.apache.zookeeper=ERROR
  log4j.logger.org.apache.hadoop.hbase=DEBUG
 +log4j.logger.org.apache.calcite=INFO
+ log4j.logger.org.apache.directory=WARN
+ log4j.logger.net.sf.ehcache=WARN

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/pom.xml
----------------------------------------------------------------------
diff --cc pom.xml
index c21fbe9,b5edb6c..49af7f2
--- a/pom.xml
+++ b/pom.xml
@@@ -92,10 -97,9 +97,10 @@@
      <jodatime.version>1.6</jodatime.version>
      <joni.version>2.1.2</joni.version>
      <avatica.version>1.8.0</avatica.version>
 +    <calcite.version>1.9.0-SNAPSHOT</calcite.version>
      <jettyVersion>8.1.7.v20120910</jettyVersion>
-     <tephra.version>0.7.0</tephra.version>
-     <spark.version>1.5.2</spark.version>
+     <tephra.version>0.8.0-incubating</tephra.version>
+     <spark.version>1.6.1</spark.version>
      <scala.version>2.10.4</scala.version>
      <scala.binary.version>2.10</scala.binary.version>