You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/09/14 21:43:46 UTC
[48/50] [abbrv] phoenix git commit: Fix compilation errors from sync
with master
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
index 1b83be2,0000000..0915d09
mode 100644,000000..100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixTableModify.java
@@@ -1,395 -1,0 +1,395 @@@
+package org.apache.phoenix.calcite.rel;
+
+import static org.apache.phoenix.execute.MutationState.RowTimestampColInfo.NULL_ROWTIMESTAMP_INFO;
+
+import java.sql.ParameterMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.prepare.Prepare.CatalogReader;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableModify;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.calcite.PhoenixTable;
+import org.apache.phoenix.compile.ColumnResolver;
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.FromCompiler;
+import org.apache.phoenix.compile.MutationPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.SequenceManager;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.compile.StatementPlan;
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.execute.MutationState.RowMutationState;
+import org.apache.phoenix.execute.MutationState.RowTimestampColInfo;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.IllegalDataException;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PName;
+import org.apache.phoenix.schema.PRow;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class PhoenixTableModify extends TableModify implements PhoenixRel {
+
+ public PhoenixTableModify(RelOptCluster cluster, RelTraitSet traits,
+ RelOptTable table, CatalogReader catalogReader, RelNode child,
+ Operation operation, List<String> updateColumnList, boolean flattened) {
+ super(cluster, traits, table, catalogReader, child, operation, updateColumnList, flattened);
+ assert operation == Operation.INSERT || operation == Operation.DELETE;
+ }
+
+ @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new PhoenixTableModify(
+ getCluster(),
+ traitSet,
+ getTable(),
+ getCatalogReader(),
+ sole(inputs),
+ getOperation(),
+ getUpdateColumnList(),
+ isFlattened());
+ }
+
+ @Override
+ public StatementPlan implement(PhoenixRelImplementor implementor) {
+ final QueryPlan queryPlan = implementor.visitInput(0, (PhoenixQueryRel) input);
+ final RowProjector projector = implementor.getTableMapping().createRowProjector();
+
+ final PhoenixTable targetTable = getTable().unwrap(PhoenixTable.class);
+ final PhoenixConnection connection = targetTable.pc;
+ final TableRef targetTableRef = targetTable.tableMapping.getTableRef();
+
+ if (getOperation() == Operation.INSERT) {
+ return upsert(connection, targetTable, targetTableRef, queryPlan, projector);
+ }
+
+ // delete
+ return delete(connection, targetTable, targetTableRef, queryPlan, projector);
+ }
+
+ private static MutationPlan upsert(final PhoenixConnection connection,
+ final PhoenixTable targetTable, final TableRef targetTableRef,
+ final QueryPlan queryPlan, final RowProjector projector) {
+ try (PhoenixStatement stmt = new PhoenixStatement(connection)) {
+ final ColumnResolver resolver = FromCompiler.getResolver(targetTableRef);
+ final StatementContext context = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt));
+
+ // TODO TenantId, ViewIndexId, UpdatableViewColumns
+ final List<PColumn> mappedColumns = targetTable.tableMapping.getMappedColumns();
+ final int[] columnIndexes = new int[mappedColumns.size()];
+ final int[] pkSlotIndexes = new int[mappedColumns.size()];
+ for (int i = 0; i < columnIndexes.length; i++) {
+ PColumn column = mappedColumns.get(i);
+ if (SchemaUtil.isPKColumn(column)) {
+ pkSlotIndexes[i] = column.getPosition();
+ }
+ columnIndexes[i] = column.getPosition();
+ }
+ // TODO
+ final boolean useServerTimestamp = false;
+
+ return new MutationPlan() {
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return queryPlan.getContext().getBindManager().getParameterMetaData();
+ }
+
+ @Override
+ public StatementContext getContext() {
+ return context;
+ }
+
+ @Override
+ public TableRef getTargetRef() {
+ return targetTableRef;
+ }
+
+ @Override
+ public Set<TableRef> getSourceRefs() {
+ // TODO return originalQueryPlan.getSourceRefs();
+ return queryPlan.getSourceRefs();
+ }
+
+ @Override
+ public org.apache.phoenix.jdbc.PhoenixStatement.Operation getOperation() {
+ return org.apache.phoenix.jdbc.PhoenixStatement.Operation.UPSERT;
+ }
+
+ @Override
+ public MutationState execute() throws SQLException {
+ ResultIterator iterator = queryPlan.iterator();
+ // simplest version, no run-on-server, no pipelined update
+ StatementContext childContext = queryPlan.getContext();
+ ConnectionQueryServices services = connection.getQueryServices();
+ int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,
+ QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+ int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
+ boolean isAutoCommit = connection.getAutoCommit();
+ byte[][] values = new byte[columnIndexes.length][];
+ int rowCount = 0;
+ Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize);
+ PTable table = targetTableRef.getTable();
+ try (ResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) {
+ ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+ while (rs.next()) {
+ for (int i = 0; i < values.length; i++) {
+ PColumn column = table.getColumns().get(columnIndexes[i]);
+ byte[] bytes = rs.getBytes(i + 1);
+ ptr.set(bytes == null ? ByteUtil.EMPTY_BYTE_ARRAY : bytes);
+ Object value = rs.getObject(i + 1);
+ int rsPrecision = rs.getMetaData().getPrecision(i + 1);
+ Integer precision = rsPrecision == 0 ? null : rsPrecision;
+ int rsScale = rs.getMetaData().getScale(i + 1);
+ Integer scale = rsScale == 0 ? null : rsScale;
+ // We are guaranteed that the two column will have compatible types,
+ // as we checked that before.
+ if (!column.getDataType().isSizeCompatible(ptr, value, column.getDataType(), precision, scale,
+ column.getMaxLength(), column.getScale())) { throw new SQLExceptionInfo.Builder(
+ SQLExceptionCode.DATA_EXCEEDS_MAX_CAPACITY).setColumnName(column.getName().getString())
+ .setMessage("value=" + column.getDataType().toStringLiteral(ptr, null)).build()
+ .buildException(); }
+ column.getDataType().coerceBytes(ptr, value, column.getDataType(),
+ precision, scale, SortOrder.getDefault(),
+ column.getMaxLength(), column.getScale(), column.getSortOrder(),
+ table.rowKeyOrderOptimizable());
+ values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr);
+ }
+ setValues(values, pkSlotIndexes, columnIndexes, table, mutation, connection, useServerTimestamp);
+ rowCount++;
+ // Commit a batch if auto commit is true and we're at our batch size
+ if (isAutoCommit && rowCount % batchSize == 0) {
+ MutationState state = new MutationState(targetTableRef, mutation, 0, maxSize, connection);
+ connection.getMutationState().join(state);
+ connection.getMutationState().send();
+ mutation.clear();
+ }
+ }
+ // If auto commit is true, this last batch will be committed upon return
+ return new MutationState(targetTableRef, mutation, rowCount / batchSize * batchSize, maxSize, connection);
+ }
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ List<String> queryPlanSteps = queryPlan.getExplainPlan().getPlanSteps();
+ List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+ planSteps.add("UPSERT SELECT");
+ planSteps.addAll(queryPlanSteps);
+ return new ExplainPlan(planSteps);
+ }
+ };
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,RowMutationState> mutation, PhoenixConnection connection, boolean useServerTimestamp) {
+ Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length);
+ byte[][] pkValues = new byte[table.getPKColumns().size()][];
+ // If the table uses salting, the first byte is the salting byte, set to an empty array
+ // here and we will fill in the byte later in PRowImpl.
+ if (table.getBucketNum() != null) {
+ pkValues[0] = new byte[] {0};
+ }
+ Long rowTimestamp = null; // case when the table doesn't have a row timestamp column
+ RowTimestampColInfo rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp);
+ for (int i = 0; i < values.length; i++) {
+ byte[] value = values[i];
+ PColumn column = table.getColumns().get(columnIndexes[i]);
+ if (SchemaUtil.isPKColumn(column)) {
+ pkValues[pkSlotIndex[i]] = value;
+ if (SchemaUtil.getPKPosition(table, column) == table.getRowTimestampColPos()) {
+ if (!useServerTimestamp) {
+ PColumn rowTimestampCol = table.getPKColumns().get(table.getRowTimestampColPos());
+ rowTimestamp = PLong.INSTANCE.getCodec().decodeLong(value, 0, rowTimestampCol.getSortOrder());
+ if (rowTimestamp < 0) {
+ throw new IllegalDataException("Value of a column designated as ROW_TIMESTAMP cannot be less than zero");
+ }
+ rowTsColInfo = new RowTimestampColInfo(useServerTimestamp, rowTimestamp);
+ }
+ }
+ } else {
+ columnValues.put(column, value);
+ }
+ }
+ ImmutableBytesPtr ptr = new ImmutableBytesPtr();
+ table.newKey(ptr, pkValues);
+ mutation.put(ptr, new RowMutationState(columnValues, connection.getStatementExecutionCounter(), rowTsColInfo));
+ }
+
+ private static MutationPlan delete(final PhoenixConnection connection,
+ final PhoenixTable targetTable, final TableRef targetTableRef,
+ final QueryPlan queryPlan, final RowProjector projector) {
+ final StatementContext context = queryPlan.getContext();
+ // TODO
+ final boolean deleteFromImmutableIndexToo = false;
+ return new MutationPlan() {
+ @Override
+ public ParameterMetaData getParameterMetaData() {
+ return context.getBindManager().getParameterMetaData();
+ }
+
+ @Override
+ public StatementContext getContext() {
+ return context;
+ }
+
+ @Override
+ public TableRef getTargetRef() {
+ return targetTableRef;
+ }
+
+ @Override
+ public Set<TableRef> getSourceRefs() {
+ // TODO dataPlan.getSourceRefs();
+ return queryPlan.getSourceRefs();
+ }
+
+ @Override
+ public org.apache.phoenix.jdbc.PhoenixStatement.Operation getOperation() {
+ return org.apache.phoenix.jdbc.PhoenixStatement.Operation.DELETE;
+ }
+
+ @Override
+ public MutationState execute() throws SQLException {
+ ResultIterator iterator = queryPlan.iterator();
+ try {
+ // TODO hasLimit??
+ return deleteRows(context, targetTableRef, deleteFromImmutableIndexToo ? queryPlan.getTableRef() : null, iterator, projector, queryPlan.getTableRef());
+ } finally {
+ iterator.close();
+ }
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ List<String> queryPlanSteps = queryPlan.getExplainPlan().getPlanSteps();
+ List<String> planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1);
+ planSteps.add("DELETE ROWS");
+ planSteps.addAll(queryPlanSteps);
+ return new ExplainPlan(planSteps);
+ }
+ };
+ }
+
+ private static MutationState deleteRows(StatementContext childContext, TableRef targetTableRef, TableRef indexTableRef, ResultIterator iterator, RowProjector projector, TableRef sourceTableRef) throws SQLException {
+ PTable table = targetTableRef.getTable();
+ PhoenixStatement statement = childContext.getStatement();
+ PhoenixConnection connection = statement.getConnection();
+ PName tenantId = connection.getTenantId();
+ byte[] tenantIdBytes = null;
+ if (tenantId != null) {
- tenantIdBytes = ScanUtil.getTenantIdBytes(table.getRowKeySchema(), table.getBucketNum() != null, tenantId);
++ tenantIdBytes = ScanUtil.getTenantIdBytes(table.getRowKeySchema(), table.getBucketNum() != null, tenantId, table.getViewIndexId() != null);
+ }
+ final boolean isAutoCommit = connection.getAutoCommit();
+ ConnectionQueryServices services = connection.getQueryServices();
+ final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
+ final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
+ Map<ImmutableBytesPtr,RowMutationState> mutations = Maps.newHashMapWithExpectedSize(batchSize);
+ Map<ImmutableBytesPtr,RowMutationState> indexMutations = null;
+ // If indexTableRef is set, we're deleting the rows from both the index table and
+ // the data table through a single query to save executing an additional one.
+ if (indexTableRef != null) {
+ indexMutations = Maps.newHashMapWithExpectedSize(batchSize);
+ }
+ List<PColumn> pkColumns = table.getPKColumns();
+ boolean isMultiTenant = table.isMultiTenant() && tenantIdBytes != null;
+ boolean isSharedViewIndex = table.getViewIndexId() != null;
+ int offset = (table.getBucketNum() == null ? 0 : 1);
+ byte[][] values = new byte[pkColumns.size()][];
+ if (isMultiTenant) {
+ values[offset++] = tenantIdBytes;
+ }
+ if (isSharedViewIndex) {
+ values[offset++] = MetaDataUtil.getViewIndexIdDataType().toBytes(table.getViewIndexId());
+ }
+ try (PhoenixResultSet rs = new PhoenixResultSet(iterator, projector, childContext)) {
+ int rowCount = 0;
+ while (rs.next()) {
+ ImmutableBytesPtr ptr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map
+ // Use tuple directly, as projector would not have all the PK columns from
+ // our index table inside of our projection. Since the tables are equal,
+ // there's no transation required.
+ if (sourceTableRef.equals(targetTableRef)) {
+ rs.getCurrentRow().getKey(ptr);
+ } else {
+ for (int i = offset; i < values.length; i++) {
+ byte[] byteValue = rs.getBytes(i+1-offset);
+ // The ResultSet.getBytes() call will have inverted it - we need to invert it back.
+ // TODO: consider going under the hood and just getting the bytes
+ if (pkColumns.get(i).getSortOrder() == SortOrder.DESC) {
+ byte[] tempByteValue = Arrays.copyOf(byteValue, byteValue.length);
+ byteValue = SortOrder.invert(byteValue, 0, tempByteValue, 0, byteValue.length);
+ }
+ values[i] = byteValue;
+ }
+ table.newKey(ptr, values);
+ }
+ // When issuing deletes, we do not care about the row time ranges. Also, if the table had a row timestamp column, then the
+ // row key will already have its value.
+ mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO));
+ if (indexTableRef != null) {
+ ImmutableBytesPtr indexPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map
+ rs.getCurrentRow().getKey(indexPtr);
+ indexMutations.put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO));
+ }
+ if (mutations.size() > maxSize) {
+ throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize);
+ }
+ rowCount++;
+ // Commit a batch if auto commit is true and we're at our batch size
+ if (isAutoCommit && rowCount % batchSize == 0) {
+ MutationState state = new MutationState(targetTableRef, mutations, 0, maxSize, connection);
+ connection.getMutationState().join(state);
+ if (indexTableRef != null) {
+ MutationState indexState = new MutationState(indexTableRef, indexMutations, 0, maxSize, connection);
+ connection.getMutationState().join(indexState);
+ }
+ connection.getMutationState().send();
+ mutations.clear();
+ if (indexMutations != null) {
+ indexMutations.clear();
+ }
+ }
+ }
+
+ // If auto commit is true, this last batch will be committed upon return
+ int nCommittedRows = rowCount / batchSize * batchSize;
+ MutationState state = new MutationState(targetTableRef, mutations, nCommittedRows, maxSize, connection);
+ if (indexTableRef != null) {
+ // To prevent the counting of these index rows, we have a negative for remainingRows.
+ MutationState indexState = new MutationState(indexTableRef, indexMutations, 0, maxSize, connection);
+ state.join(indexState);
+ }
+ return state;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
index a72793a,0000000..5bc2c0d
mode 100644,000000..100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/calcite/rel/PhoenixToEnumerableConverter.java
@@@ -1,138 -1,0 +1,143 @@@
+package org.apache.phoenix.calcite.rel;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.calcite.adapter.enumerable.EnumerableConvention;
+import org.apache.calcite.adapter.enumerable.EnumerableRel;
+import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
+import org.apache.calcite.adapter.enumerable.JavaRowFormat;
+import org.apache.calcite.adapter.enumerable.PhysType;
+import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
+import org.apache.calcite.linq4j.tree.BlockBuilder;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.linq4j.tree.MethodCallExpression;
+import org.apache.calcite.linq4j.tree.ParameterExpression;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterImpl;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableIntList;
++import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.calcite.BuiltInMethod;
+import org.apache.phoenix.calcite.rel.PhoenixRelImplementor.ImplementorContext;
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementPlan;
+import org.apache.phoenix.execute.DelegateQueryPlan;
+import org.apache.phoenix.execute.RuntimeContext;
+import org.apache.phoenix.execute.RuntimeContextImpl;
+import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
+import org.apache.phoenix.iterate.ResultIterator;
+
+/**
+ * Scan of a Phoenix table.
+ */
+public class PhoenixToEnumerableConverter extends ConverterImpl implements EnumerableRel {
+
+ public static PhoenixToEnumerableConverter create(RelNode input) {
+ RelOptCluster cluster = input.getCluster();
+ RelTraitSet traits = input.getTraitSet().replace(EnumerableConvention.INSTANCE);
+ return new PhoenixToEnumerableConverter(cluster, traits, input);
+ }
+
+ private PhoenixToEnumerableConverter(
+ RelOptCluster cluster,
+ RelTraitSet traits,
+ RelNode input) {
+ super(cluster, ConventionTraitDef.INSTANCE, traits, input);
+ }
+
+ @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return create(sole(inputs));
+ }
+
+ @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ return super.computeSelfCost(planner, mq)
+ .multiplyBy(.1)
+ .multiplyBy(PhoenixRel.PHOENIX_FACTOR)
+ .multiplyBy(PhoenixRel.SERVER_FACTOR);
+ }
+
+ @Override
+ public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
+ // Generates code that instantiates a result iterator, then converts it
+ // to an enumerable.
+ //
+ // ResultIterator iterator = root.get("x");
+ // return CalciteRuntime.toEnumerable(iterator);
+ final BlockBuilder list = new BlockBuilder();
+ StatementPlan plan = makePlan((PhoenixRel)getInput());
+ Expression var = stash(implementor, plan, StatementPlan.class);
+ final RelDataType rowType = getRowType();
+ final PhysType physType =
+ PhysTypeImpl.of(
+ implementor.getTypeFactory(), rowType,
+ pref.prefer(JavaRowFormat.ARRAY));
+ final Expression iterator_ =
+ list.append("iterator", var);
+ final Expression enumerable_ =
+ list.append("enumerable",
+ Expressions.call(BuiltInMethod.TO_ENUMERABLE.method,
+ iterator_));
+ list.add(Expressions.return_(null, enumerable_));
+ return implementor.result(physType, list.toBlock());
+ }
+
+ static StatementPlan makePlan(PhoenixRel rel) {
+ RuntimeContext runtimeContext = new RuntimeContextImpl();
+ RuntimeContext.THREAD_LOCAL.get().add(runtimeContext);
+ final PhoenixRelImplementor phoenixImplementor = new PhoenixRelImplementorImpl(runtimeContext);
+ phoenixImplementor.pushContext(new ImplementorContext(true, false, ImmutableIntList.identity(rel.getRowType().getFieldCount())));
+ final StatementPlan plan = rel.implement(phoenixImplementor);
+ if (!(plan instanceof QueryPlan)) {
+ return plan;
+ }
+
+ return new DelegateQueryPlan((QueryPlan) plan) {
+ @Override
+ public ResultIterator iterator() throws SQLException {
+ return iterator(DefaultParallelScanGrouper.getInstance());
+ }
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ return delegate.getExplainPlan();
+ }
+ @Override
+ public RowProjector getProjector() {
+ return phoenixImplementor.getTableMapping().createRowProjector();
+ }
+ @Override
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper)
+ throws SQLException {
+ return delegate.iterator(scanGrouper);
+ }
+ @Override
+ public QueryPlan limit(Integer limit) {
+ return delegate.limit(limit);
+ }
++ @Override
++ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
++ return delegate.iterator(scanGrouper, scan);
++ }
+ };
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ static Expression stash(EnumerableRelImplementor implementor, Object o, Class clazz) {
+ ParameterExpression x = (ParameterExpression) implementor.stash(o, clazz);
+ MethodCallExpression e =
+ Expressions.call(implementor.getRootExpression(),
+ org.apache.calcite.util.BuiltInMethod.DATA_CONTEXT_GET.method,
+ Expressions.constant(x.name));
+ return Expressions.convert_(e, clazz);
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
index 6909b23,4d3c0cf..bbc995c
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java
@@@ -175,12 -177,12 +177,12 @@@ public class TupleProjectionCompiler
projectedColumns.add(column);
}
return PTableImpl.makePTable(table.getTenantId(), PROJECTED_TABLE_SCHEMA, table.getName(), PTableType.PROJECTED,
- null, table.getTimeStamp(), table.getSequenceNumber(), table.getPKName(),
- retainPKColumns ? table.getBucketNum() : null, projectedColumns, null, null,
- Collections.<PTable> emptyList(), table.isImmutableRows(), Collections.<PName> emptyList(), null, null,
- table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(),
- table.getViewIndexId(), null, table.rowKeyOrderOptimizable(), table.isTransactional(),
- table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
+ null, table.getTimeStamp(), table.getSequenceNumber(), table.getPKName(),
+ retainPKColumns ? table.getBucketNum() : null, projectedColumns, null,
+ null, Collections.<PTable>emptyList(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null,
+ table.isWALDisabled(), retainPKColumns ? table.isMultiTenant() : false, table.getStoreNulls(), table.getViewType(),
+ retainPKColumns ? table.getViewIndexId() : null, null, table.rowKeyOrderOptimizable(), table.isTransactional(),
- table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp());
++ table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema());
}
// For extracting column references from single select statement
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 5ab8e3a,00d478a..cd83c4d
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@@ -77,10 -80,9 +80,12 @@@ public class AggregatePlan extends Base
private final Expression having;
private List<KeyRange> splits;
private List<List<Scan>> scans;
-
+ private static final Logger logger = LoggerFactory.getLogger(AggregatePlan.class);
+ private boolean isSerial;
-
++
+ public static AggregatePlan create(AggregatePlan plan, OrderBy newOrderBy) {
+ return new AggregatePlan(plan.getContext(), plan.getStatement(), plan.getTableRef(), plan.getSourceRefs().iterator().next(), plan.getProjector(), null, null, newOrderBy, plan.parallelIteratorFactory, plan.getGroupBy(), plan.getHaving(), plan.dynamicFilter);
+ }
public AggregatePlan(StatementContext context, FilterableStatement statement, TableRef table,
RowProjector projector, Integer limit, Integer offset, OrderBy orderBy,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
index 56930ba,fc5a04d..9e6dad3
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
@@@ -21,9 -21,9 +21,10 @@@ import java.io.IOException
import java.sql.SQLException;
import java.util.List;
+ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
@@@ -105,10 -104,14 +106,15 @@@ public class CorrelatePlan extends Dele
}
@Override
- public ResultIterator iterator(ParallelScanGrouper scanGrouper)
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper)
+ throws SQLException {
+ return iterator(scanGrouper, null);
+ }
+ @Override
+ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan)
throws SQLException {
return new ResultIterator() {
+ private final CorrelateVariable variable = runtimeContext.getCorrelateVariable(variableId);
private final ValueBitSet destBitSet = ValueBitSet.newInstance(joinedSchema);
private final ValueBitSet lhsBitSet = ValueBitSet.newInstance(lhsSchema);
private final ValueBitSet rhsBitSet =
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
index 8f4711c,0000000..31ad0e9
mode 100644,000000..100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
@@@ -1,70 -1,0 +1,70 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.ScanRanges;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.schema.TableRef;
+
+public class DegenerateQueryPlan extends BaseQueryPlan {
+
+ public DegenerateQueryPlan(StatementContext context, FilterableStatement statement, TableRef table) {
+ super(context, statement, table, table, RowProjector.EMPTY_PROJECTOR, PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA, null,null, OrderBy.EMPTY_ORDER_BY, GroupBy.EMPTY_GROUP_BY, null, null);
+ context.setScanRanges(ScanRanges.NOTHING);
+ }
+
+ @Override
+ public List<KeyRange> getSplits() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<List<Scan>> getScans() {
+ return Collections.emptyList();
+ }
+
+ @Override
- protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) throws SQLException {
++ protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
+ return null;
+ }
+
+ @Override
+ public boolean useRoundRobinIterator() {
+ return false;
+ }
+
+ @Override
+ public QueryPlan limit(Integer limit) {
+ return this;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index b085f08,7f735b7..d3d000b
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@@ -81,17 -80,15 +81,19 @@@ public class ScanPlan extends BaseQuery
private List<KeyRange> splits;
private List<List<Scan>> scans;
private boolean allowPageFilter;
+ private boolean isSerial;
+ private boolean isDataToScanWithinThreshold;
+
+ public static ScanPlan create(ScanPlan plan, OrderBy newOrderBy) throws SQLException {
+ return new ScanPlan(plan.getContext(), plan.getStatement(), plan.getTableRef(), plan.getSourceRefs().iterator().next(), plan.getProjector(), null, null, newOrderBy, plan.parallelIteratorFactory, plan.allowPageFilter, plan.dynamicFilter);
+ }
public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, Integer offset, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) throws SQLException {
- this(context, statement, table, projector, limit, offset, orderBy, parallelIteratorFactory, allowPageFilter, null);
+ this(context, statement, table, table, projector, limit, offset, orderBy, parallelIteratorFactory, allowPageFilter, null);
}
- private ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, Integer offset, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, Expression dynamicFilter) throws SQLException {
- super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit,offset, orderBy, GroupBy.EMPTY_GROUP_BY,
+ public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, TableRef srcRef, RowProjector projector, Integer limit, Integer offset, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, Expression dynamicFilter) throws SQLException {
+ super(context, statement, table, srcRef, projector, context.getBindManager().getParameterMetaData(), limit,offset, orderBy, GroupBy.EMPTY_GROUP_BY,
parallelIteratorFactory != null ? parallelIteratorFactory :
buildResultIteratorFactory(context, statement, table, orderBy, limit, offset, allowPageFilter), dynamicFilter);
this.allowPageFilter = allowPageFilter;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
index 12db6d9,94c59df..137a632
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
@@@ -20,9 -20,9 +20,10 @@@ package org.apache.phoenix.execute
import java.sql.SQLException;
import java.util.List;
+ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.expression.BaseSingleExpression;
import org.apache.phoenix.expression.BaseTerminalExpression;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index e26660e,8b9adfd..20bda5e
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@@ -114,11 -119,12 +117,11 @@@ public abstract class BaseResultIterato
private final List<List<Scan>> scans;
private final List<KeyRange> splits;
- private final PTableStats tableStats;
private final byte[] physicalTableName;
- private final QueryPlan plan;
+ protected final QueryPlan plan;
protected final String scanId;
protected final MutationState mutationState;
- private final ParallelScanGrouper scanGrouper;
+ protected final ParallelScanGrouper scanGrouper;
// TODO: too much nesting here - breakup into new classes.
private final List<List<List<Pair<Scan,Future<PeekingResultIterator>>>>> allFutures;
private Long estimatedRows;
@@@ -136,17 -143,14 +140,20 @@@
return plan.getTableRef().getTable();
}
+ protected boolean useStats() {
++ return useStats(this.context);
++ }
++
+ private static boolean useStats(StatementContext context) {
+ Scan scan = context.getScan();
+ boolean isPointLookup = context.getScanRanges().isPointLookup();
/*
- * Don't use guide posts if:
- * 1) We're doing a point lookup, as HBase is fast enough at those
- * to not need them to be further parallelized. TODO: perf test to verify
- * 2) We're collecting stats, as in this case we need to scan entire
- * regions worth of data to track where to put the guide posts.
+ * Don't use guide posts:
+ * 1) If we're collecting stats, as in this case we need to scan entire
+ * regions worth of data to track where to put the guide posts.
+ * 2) If the query is going to be executed serially.
*/
- if (isPointLookup || ScanUtil.isAnalyzeTable(scan)) {
+ if (ScanUtil.isAnalyzeTable(scan)) {
return false;
}
return true;
@@@ -342,10 -358,17 +361,10 @@@
TableRef tableRef = plan.getTableRef();
PTable table = tableRef.getTable();
physicalTableName = table.getPhysicalName().getBytes();
- Long currentSCN = context.getConnection().getSCN();
- if (null == currentSCN) {
- currentSCN = HConstants.LATEST_TIMESTAMP;
- }
- tableStats = useStats() && StatisticsUtil.isStatsEnabled(TableName.valueOf(physicalTableName))
- ? context.getConnection().getQueryServices().getTableStats(physicalTableName, currentSCN)
- : PTableStats.EMPTY_STATS;
// Used to tie all the scans together during logging
- scanId = UUID.randomUUID().toString();
+ scanId = new UUID(ThreadLocalRandom.current().nextLong(), ThreadLocalRandom.current().nextLong()).toString();
- initializeScan(plan, perScanLimit, offset);
+ initializeScan(plan, perScanLimit, offset, scan);
this.scans = getParallelScans();
List<KeyRange> splitRanges = Lists.newArrayListWithExpectedSize(scans.size() * ESTIMATED_GUIDEPOSTS_PER_REGION);
@@@ -401,18 -424,13 +420,13 @@@
return guideIndex;
}
- private static GuidePostsInfo getGuidePosts(StatementContext context, PTable table, Set<byte[]> whereConditions) throws SQLException {
- /*
- * Don't use guide posts if: 1) We're doing a point lookup, as HBase is fast enough at those to not need them to
- * be further parallelized. TODO: pref test to verify 2) We're collecting stats, as in this case we need to scan
- * entire regions worth of data to track where to put the guide posts.
- */
- if (!useStats(context)) { return GuidePostsInfo.NO_GUIDEPOST; }
- private GuidePostsInfo getGuidePosts(Set<byte[]> whereConditions) {
- if (!useStats()) { return GuidePostsInfo.NO_GUIDEPOST; }
++ private static GuidePostsInfo getGuidePosts(StatementContext context, PTable table, Set<byte[]> whereConditions, boolean useStats) throws SQLException {
++ if (!useStats) { return GuidePostsInfo.NO_GUIDEPOST; }
GuidePostsInfo gps = null;
- PTable table = getTable();
+ PTableStats tableStats = new MetaDataClient(context.getConnection()).getTableStats(table);
Map<byte[], GuidePostsInfo> guidePostMap = tableStats.getGuidePosts();
- byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(getTable());
+ byte[] defaultCF = SchemaUtil.getEmptyColumnFamily(table);
if (table.getColumnFamilies().isEmpty()) {
// For sure we can get the defaultCF from the table
gps = getDefaultFamilyGuidePosts(guidePostMap, defaultCF);
@@@ -495,7 -571,7 +567,7 @@@
whereConditions.add(cf);
}
}
- GuidePostsInfo gps = getGuidePosts(context, table, whereConditions);
- GuidePostsInfo gps = getGuidePosts(whereConditions);
++ GuidePostsInfo gps = getGuidePosts(context, table, whereConditions, useStats());
hasGuidePosts = gps != GuidePostsInfo.NO_GUIDEPOST;
boolean traverseAllRegions = isSalted || isLocalIndex;
if (!traverseAllRegions) {
@@@ -612,130 -686,6 +682,130 @@@
return parallelScans;
}
+
+ /**
+ * Compute the estimated count of rows and bytes that will be scanned.
+ * @return the estimated row count and the byte count.
+ * @throws SQLException
+ */
+ public static Pair<Long, Long> getEstimatedCount(StatementContext context, PTable table) throws SQLException {
+ if (table.getName() == null) { // empty table
+ return new Pair<Long, Long>(null, null);
+ }
+
+ if (context.getScanRanges().isPointLookup()) {
+ return new Pair<Long, Long>(1L, SchemaUtil.estimateRowSize(table));
+ }
+
+ TreeSet<byte[]> whereConditions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+ for(Pair<byte[], byte[]> where : context.getWhereConditionColumns()) {
+ byte[] cf = where.getFirst();
+ if (cf != null) {
+ whereConditions.add(cf);
+ }
+ }
- GuidePostsInfo gps = getGuidePosts(context, table, whereConditions);
++ GuidePostsInfo gps = getGuidePosts(context, table, whereConditions, useStats(context));
+ if (gps == GuidePostsInfo.NO_GUIDEPOST) {
+ return new Pair<Long, Long>(null, null);
+ }
+
+ byte[] startKey = ByteUtil.EMPTY_BYTE_ARRAY;
+ byte[] stopKey = ByteUtil.EMPTY_BYTE_ARRAY;
+ Scan scan = context.getScan();
+ List<HRegionLocation> regionLocations = context.getConnection().getQueryServices()
+ .getAllTableRegions(table.getPhysicalName().getBytes());
+ List<byte[]> regionBoundaries = toBoundaries(regionLocations);
+ ScanRanges scanRanges = context.getScanRanges();
+ boolean isSalted = table.getBucketNum() != null;
+ boolean isLocalIndex = table.getIndexType() == IndexType.LOCAL;
+ boolean traverseAllRegions = isSalted || isLocalIndex;
+ if (!traverseAllRegions) {
+ byte[] scanStartRow = scan.getStartRow();
+ if (scanStartRow.length != 0 && Bytes.compareTo(scanStartRow, startKey) > 0) {
+ startKey = scanStartRow;
+ }
+ byte[] scanStopRow = scan.getStopRow();
+ if (stopKey.length == 0
+ || (scanStopRow.length != 0 && Bytes.compareTo(scanStopRow, stopKey) < 0)) {
+ stopKey = scanStopRow;
+ }
+ }
+
+ int regionIndex = 0;
+ int stopIndex = regionBoundaries.size();
+ if (startKey.length > 0) {
+ regionIndex = getIndexContainingInclusive(regionBoundaries, startKey);
+ }
+ if (stopKey.length > 0) {
+ stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), stopKey));
+ if (isLocalIndex) {
+ stopKey = regionLocations.get(stopIndex).getRegionInfo().getEndKey();
+ }
+ }
+
+ ImmutableBytesWritable currentKey = new ImmutableBytesWritable(startKey);
+
+ int gpsSize = gps.getGuidePostsCount();
+ int keyOffset = 0;
+ ImmutableBytesWritable currentGuidePost = ByteUtil.EMPTY_IMMUTABLE_BYTE_ARRAY;
+ ImmutableBytesWritable guidePosts = gps.getGuidePosts();
+ ByteArrayInputStream stream = null;
+ DataInput input = null;
+ PrefixByteDecoder decoder = null;
+ int guideIndex = 0;
+ long estimatedRows = 0;
+ long estimatedSize = 0;
+ try {
+ if (gpsSize > 0) {
+ stream = new ByteArrayInputStream(guidePosts.get(), guidePosts.getOffset(), guidePosts.getLength());
+ input = new DataInputStream(stream);
+ decoder = new PrefixByteDecoder(gps.getMaxLength());
+ try {
+ while (currentKey.compareTo(currentGuidePost = PrefixByteCodec.decode(decoder, input)) >= 0
+ && currentKey.getLength() != 0) {
+ guideIndex++;
+ }
+ } catch (EOFException e) {}
+ }
+ byte[] currentKeyBytes = currentKey.copyBytes();
+
+ // Merge bisect with guideposts for all but the last region
+ while (regionIndex <= stopIndex) {
+ byte[] currentGuidePostBytes = currentGuidePost.copyBytes();
+ byte[] endKey, endRegionKey = EMPTY_BYTE_ARRAY;
+ if (regionIndex == stopIndex) {
+ endKey = stopKey;
+ } else {
+ endKey = regionBoundaries.get(regionIndex);
+ }
+ HRegionLocation regionLocation = regionLocations.get(regionIndex);
+ if (isLocalIndex) {
+ HRegionInfo regionInfo = regionLocation.getRegionInfo();
+ endRegionKey = regionInfo.getEndKey();
+ keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey);
+ }
+ try {
+ while (guideIndex < gpsSize && (currentGuidePost.compareTo(endKey) <= 0 || endKey.length == 0)) {
+ Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, currentGuidePostBytes, keyOffset,
+ false);
+ if (newScan != null) {
+ estimatedRows += gps.getRowCounts().get(guideIndex);
+ estimatedSize += gps.getByteCounts().get(guideIndex);
+ }
+ currentKeyBytes = currentGuidePost.copyBytes();
+ currentGuidePost = PrefixByteCodec.decode(decoder, input);
+ currentGuidePostBytes = currentGuidePost.copyBytes();
+ guideIndex++;
+ }
+ } catch (EOFException e) {}
+ currentKeyBytes = endKey;
+ regionIndex++;
+ }
+ } finally {
+ if (stream != null) Closeables.closeQuietly(stream);
+ }
+ return new Pair<Long, Long>(estimatedRows, estimatedSize);
+ }
public static <T> List<T> reverseIfNecessary(List<T> list, boolean reverse) {
if (!reverse) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/parse/ColumnDef.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsUtil.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsWriter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 755e509,daef367..cb0bceb
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@@ -1484,9 -1593,9 +1594,9 @@@ public abstract class BaseTest
if (ts != null) {
props.setProperty(CURRENT_SCN_ATTRIB, ts.toString());
}
- Connection conn = DriverManager.getConnection(getUrl(), props);
+ Connection conn = DriverManager.getConnection(url, props);
try {
- conn.createStatement().execute("CREATE SEQUENCE my.seq");
+ conn.createStatement().execute("CREATE SEQUENCE IF NOT EXISTS my.seq");
// Insert into customer table
PreparedStatement stmt = conn.prepareStatement(
"upsert into " + JOIN_CUSTOMER_TABLE_FULL_NAME +
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --cc phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index 75ff40d,05fbf81..ecf31ab
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@@ -455,13 -471,8 +471,13 @@@ public class ParallelIteratorsSplitTes
public boolean useRoundRobinIterator() {
return false;
}
+
+ @Override
+ public QueryPlan limit(Integer limit) {
+ return this;
+ }
- }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()));
+ }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan(), false);
List<KeyRange> keyRanges = parallelIterators.getSplits();
return keyRanges;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/phoenix-core/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --cc phoenix-core/src/test/resources/log4j.properties
index 96ebd73,85706b4..575ee67
--- a/phoenix-core/src/test/resources/log4j.properties
+++ b/phoenix-core/src/test/resources/log4j.properties
@@@ -61,4 -61,5 +61,6 @@@ log4j.logger.org.mortbay.log=WAR
log4j.logger.org.apache.hadoop=WARN
log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.apache.hadoop.hbase=DEBUG
+log4j.logger.org.apache.calcite=INFO
+ log4j.logger.org.apache.directory=WARN
+ log4j.logger.net.sf.ehcache=WARN
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a9526a94/pom.xml
----------------------------------------------------------------------
diff --cc pom.xml
index c21fbe9,b5edb6c..49af7f2
--- a/pom.xml
+++ b/pom.xml
@@@ -92,10 -97,9 +97,10 @@@
<jodatime.version>1.6</jodatime.version>
<joni.version>2.1.2</joni.version>
<avatica.version>1.8.0</avatica.version>
+ <calcite.version>1.9.0-SNAPSHOT</calcite.version>
<jettyVersion>8.1.7.v20120910</jettyVersion>
- <tephra.version>0.7.0</tephra.version>
- <spark.version>1.5.2</spark.version>
+ <tephra.version>0.8.0-incubating</tephra.version>
+ <spark.version>1.6.1</spark.version>
<scala.version>2.10.4</scala.version>
<scala.binary.version>2.10</scala.binary.version>