You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2014/10/24 05:14:29 UTC
[1/3] PHOENIX-944 Support derived tables in FROM clause that needs
extra steps of client-side aggregation or other processing
Repository: phoenix
Updated Branches:
refs/heads/master cd1acba17 -> fba06a80f
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
new file mode 100644
index 0000000..346a9fd
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.compile.ColumnProjector;
+import org.apache.phoenix.compile.JoinCompiler.ProjectedPTableWrapper;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+import org.apache.phoenix.schema.KeyValueSchema;
+import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.ValueBitSet;
+import org.apache.phoenix.schema.tuple.BaseTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+public class TupleProjector {
+ public static final byte[] VALUE_COLUMN_FAMILY = Bytes.toBytes("_v");
+ public static final byte[] VALUE_COLUMN_QUALIFIER = new byte[0];
+
+ private static final String SCAN_PROJECTOR = "scanProjector";
+
+ private final KeyValueSchema schema;
+ private final Expression[] expressions;
+ private ValueBitSet valueSet;
+ private final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+
+ public TupleProjector(RowProjector rowProjector) {
+ List<? extends ColumnProjector> columnProjectors = rowProjector.getColumnProjectors();
+ int count = columnProjectors.size();
+ KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
+ expressions = new Expression[count];
+ for (int i = 0; i < count; i++) {
+ Expression expression = columnProjectors.get(i).getExpression();
+ builder.addField(expression);
+ expressions[i] = expression;
+ }
+ schema = builder.build();
+ valueSet = ValueBitSet.newInstance(schema);
+ }
+
+ public TupleProjector(ProjectedPTableWrapper projected) {
+ List<PColumn> columns = projected.getTable().getColumns();
+ expressions = new Expression[columns.size() - projected.getTable().getPKColumns().size()];
+ // we do not count minNullableIndex for we might do later merge.
+ KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
+ int i = 0;
+ for (PColumn column : projected.getTable().getColumns()) {
+ if (!SchemaUtil.isPKColumn(column)) {
+ builder.addField(column);
+ expressions[i++] = projected.getSourceExpression(column);
+ }
+ }
+ schema = builder.build();
+ valueSet = ValueBitSet.newInstance(schema);
+ }
+
+ public TupleProjector(KeyValueSchema schema, Expression[] expressions) {
+ this.schema = schema;
+ this.expressions = expressions;
+ this.valueSet = ValueBitSet.newInstance(schema);
+ }
+
+ public void setValueBitSet(ValueBitSet bitSet) {
+ this.valueSet = bitSet;
+ }
+
+ public static void serializeProjectorIntoScan(Scan scan, TupleProjector projector) {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ try {
+ DataOutputStream output = new DataOutputStream(stream);
+ projector.schema.write(output);
+ int count = projector.expressions.length;
+ WritableUtils.writeVInt(output, count);
+ for (int i = 0; i < count; i++) {
+ WritableUtils.writeVInt(output, ExpressionType.valueOf(projector.expressions[i]).ordinal());
+ projector.expressions[i].write(output);
+ }
+ scan.setAttribute(SCAN_PROJECTOR, stream.toByteArray());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ public static TupleProjector deserializeProjectorFromScan(Scan scan) {
+ byte[] proj = scan.getAttribute(SCAN_PROJECTOR);
+ if (proj == null) {
+ return null;
+ }
+ ByteArrayInputStream stream = new ByteArrayInputStream(proj);
+ try {
+ DataInputStream input = new DataInputStream(stream);
+ KeyValueSchema schema = new KeyValueSchema();
+ schema.readFields(input);
+ int count = WritableUtils.readVInt(input);
+ Expression[] expressions = new Expression[count];
+ for (int i = 0; i < count; i++) {
+ int ordinal = WritableUtils.readVInt(input);
+ expressions[i] = ExpressionType.values()[ordinal].newInstance();
+ expressions[i].readFields(input);
+ }
+ return new TupleProjector(schema, expressions);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static class ProjectedValueTuple extends BaseTuple {
+ private ImmutableBytesWritable keyPtr = new ImmutableBytesWritable();
+ private long timestamp;
+ private byte[] projectedValue;
+ private int bitSetLen;
+ private KeyValue keyValue;
+
+ private ProjectedValueTuple(byte[] keyBuffer, int keyOffset, int keyLength, long timestamp, byte[] projectedValue, int bitSetLen) {
+ this.keyPtr.set(keyBuffer, keyOffset, keyLength);
+ this.timestamp = timestamp;
+ this.projectedValue = projectedValue;
+ this.bitSetLen = bitSetLen;
+ }
+
+ public ImmutableBytesWritable getKeyPtr() {
+ return keyPtr;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public byte[] getProjectedValue() {
+ return projectedValue;
+ }
+
+ public int getBitSetLength() {
+ return bitSetLen;
+ }
+
+ @Override
+ public void getKey(ImmutableBytesWritable ptr) {
+ ptr.set(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength());
+ }
+
+ @Override
+ public KeyValue getValue(int index) {
+ if (index != 0) {
+ throw new IndexOutOfBoundsException(Integer.toString(index));
+ }
+ return getValue(VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER);
+ }
+
+ @Override
+ public KeyValue getValue(byte[] family, byte[] qualifier) {
+ if (keyValue == null) {
+ keyValue = KeyValueUtil.newKeyValue(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(),
+ VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, timestamp, projectedValue, 0, projectedValue.length);
+ }
+ return keyValue;
+ }
+
+ @Override
+ public boolean getValue(byte[] family, byte[] qualifier,
+ ImmutableBytesWritable ptr) {
+ ptr.set(projectedValue);
+ return true;
+ }
+
+ @Override
+ public boolean isImmutable() {
+ return true;
+ }
+
+ @Override
+ public int size() {
+ return 1;
+ }
+ }
+
+ public ProjectedValueTuple projectResults(Tuple tuple) {
+ byte[] bytesValue = schema.toBytes(tuple, getExpressions(), valueSet, ptr);
+ Cell base = tuple.getValue(0);
+ return new ProjectedValueTuple(base.getRowArray(), base.getRowOffset(), base.getRowLength(), base.getTimestamp(), bytesValue, valueSet.getEstimatedLength());
+ }
+
+ public static void decodeProjectedValue(Tuple tuple, ImmutableBytesWritable ptr) throws IOException {
+ boolean b = tuple.getValue(VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, ptr);
+ if (!b)
+ throw new IOException("Trying to decode a non-projected value.");
+ }
+
+ public static ProjectedValueTuple mergeProjectedValue(ProjectedValueTuple dest, KeyValueSchema destSchema, ValueBitSet destBitSet,
+ Tuple src, KeyValueSchema srcSchema, ValueBitSet srcBitSet, int offset) throws IOException {
+ ImmutableBytesWritable destValue = new ImmutableBytesWritable(dest.getProjectedValue());
+ destBitSet.clear();
+ destBitSet.or(destValue);
+ int origDestBitSetLen = dest.getBitSetLength();
+ ImmutableBytesWritable srcValue = new ImmutableBytesWritable();
+ decodeProjectedValue(src, srcValue);
+ srcBitSet.clear();
+ srcBitSet.or(srcValue);
+ int origSrcBitSetLen = srcBitSet.getEstimatedLength();
+ for (int i = 0; i < srcBitSet.getMaxSetBit(); i++) {
+ if (srcBitSet.get(i)) {
+ destBitSet.set(offset + i);
+ }
+ }
+ int destBitSetLen = destBitSet.getEstimatedLength();
+ byte[] merged = new byte[destValue.getLength() - origDestBitSetLen + srcValue.getLength() - origSrcBitSetLen + destBitSetLen];
+ int o = Bytes.putBytes(merged, 0, destValue.get(), destValue.getOffset(), destValue.getLength() - origDestBitSetLen);
+ o = Bytes.putBytes(merged, o, srcValue.get(), srcValue.getOffset(), srcValue.getLength() - origSrcBitSetLen);
+ destBitSet.toBytes(merged, o);
+ ImmutableBytesWritable keyPtr = dest.getKeyPtr();
+ return new ProjectedValueTuple(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(), dest.getTimestamp(), merged, destBitSetLen);
+ }
+
+ public KeyValueSchema getSchema() {
+ return schema;
+ }
+
+ public Expression[] getExpressions() {
+ return expressions;
+ }
+
+ public ValueBitSet getValueBitSet() {
+ return valueSet;
+ }
+
+ @Override
+ public String toString() {
+ return "TUPLE-PROJECTOR {" + Arrays.toString(expressions) + " ==> " + schema.toString() + "}";
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
index 1c4232a..8a2f6d1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
@@ -23,8 +23,8 @@ import java.io.IOException;
import java.util.Collection;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.visitor.ExpressionVisitor;
-import org.apache.phoenix.join.TupleProjector;
import org.apache.phoenix.schema.KeyValueSchema;
import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
import org.apache.phoenix.schema.PColumn;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java
index fefb077..2af99ca 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java
@@ -21,7 +21,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.schema.PArrayDataType;
import org.apache.phoenix.schema.PDataType;
-import org.apache.phoenix.schema.PhoenixArray;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.tuple.Tuple;
@@ -37,15 +36,15 @@ public class DistinctValueClientAggregator extends DistinctValueWithCountClientA
@Override
public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
- if (buffer == null || buffer.length == 0) {
+ if (cachedResult == null) {
Object[] values = new Object[valueVsCount.size()];
int i = 0;
for (ImmutableBytesPtr key : valueVsCount.keySet()) {
values[i++] = valueType.toObject(key, sortOrder);
}
- PhoenixArray array = PArrayDataType.instantiatePhoenixArray(valueType, values);
- buffer = resultType.toBytes(array, sortOrder);
+ cachedResult = PArrayDataType.instantiatePhoenixArray(valueType, values);
}
+ buffer = resultType.toBytes(cachedResult, sortOrder);
ptr.set(buffer);
return true;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java
new file mode 100644
index 0000000..8fd36b3
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+
+/**
+ *
+ * Base class for result scanners that aggregate the row count value for rows with
+ * duplicate keys. This result scanner assumes that the results of the inner result
+ * scanner are returned in order of grouping keys.
+ *
+ */
+public abstract class BaseGroupedAggregatingResultIterator implements
+ AggregatingResultIterator {
+ private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+ protected final PeekingResultIterator resultIterator;
+ protected final Aggregators aggregators;
+ private ImmutableBytesWritable currentKey;
+ private ImmutableBytesWritable nextKey;
+
+ public BaseGroupedAggregatingResultIterator(
+ PeekingResultIterator resultIterator, Aggregators aggregators) {
+ if (resultIterator == null) throw new NullPointerException();
+ if (aggregators == null) throw new NullPointerException();
+ this.resultIterator = resultIterator;
+ this.aggregators = aggregators;
+ this.currentKey = new ImmutableBytesWritable(UNITIALIZED_KEY_BUFFER);
+ this.nextKey = new ImmutableBytesWritable(UNITIALIZED_KEY_BUFFER);
+ }
+
+ protected abstract ImmutableBytesWritable getGroupingKey(Tuple tuple, ImmutableBytesWritable ptr) throws SQLException;
+ protected abstract Tuple wrapKeyValueAsResult(KeyValue keyValue) throws SQLException;
+
+ @Override
+ public Tuple next() throws SQLException {
+ Tuple result = resultIterator.next();
+ if (result == null) {
+ return null;
+ }
+ if (currentKey.get() == UNITIALIZED_KEY_BUFFER) {
+ getGroupingKey(result, currentKey);
+ }
+ Aggregator[] rowAggregators = aggregators.getAggregators();
+ aggregators.reset(rowAggregators);
+ while (true) {
+ aggregators.aggregate(rowAggregators, result);
+ Tuple nextResult = resultIterator.peek();
+ if (nextResult == null || !currentKey.equals(getGroupingKey(nextResult, nextKey))) {
+ break;
+ }
+ result = resultIterator.next();
+ }
+
+ byte[] value = aggregators.toBytes(rowAggregators);
+ Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(currentKey, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+ currentKey.set(nextKey.get(), nextKey.getOffset(), nextKey.getLength());
+ return tuple;
+ }
+
+ @Override
+ public void close() throws SQLException {
+ resultIterator.close();
+ }
+
+ @Override
+ public void aggregate(Tuple result) {
+ Aggregator[] rowAggregators = aggregators.getAggregators();
+ aggregators.reset(rowAggregators);
+ aggregators.aggregate(rowAggregators, result);
+ }
+
+ @Override
+ public void explain(List<String> planSteps) {
+ resultIterator.explain(planSteps);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java
index bb9bf50..1cf9b73 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java
@@ -17,19 +17,13 @@
*/
package org.apache.phoenix.iterate;
-import static org.apache.phoenix.query.QueryConstants.*;
-
import java.sql.SQLException;
-import java.util.List;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
-import org.apache.phoenix.expression.aggregator.Aggregator;
import org.apache.phoenix.expression.aggregator.Aggregators;
import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
-import org.apache.phoenix.util.KeyValueUtil;
-import org.apache.phoenix.util.TupleUtil;
@@ -51,61 +45,26 @@ import org.apache.phoenix.util.TupleUtil;
*
* @since 0.1
*/
-public class GroupedAggregatingResultIterator implements AggregatingResultIterator {
- private final ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
- private final PeekingResultIterator resultIterator;
- protected final Aggregators aggregators;
-
- public GroupedAggregatingResultIterator( PeekingResultIterator resultIterator, Aggregators aggregators) {
- if (resultIterator == null) throw new NullPointerException();
- if (aggregators == null) throw new NullPointerException();
- this.resultIterator = resultIterator;
- this.aggregators = aggregators;
- }
-
- @Override
- public Tuple next() throws SQLException {
- Tuple result = resultIterator.next();
- if (result == null) {
- return null;
- }
- Aggregator[] rowAggregators = aggregators.getAggregators();
- aggregators.reset(rowAggregators);
- while (true) {
- aggregators.aggregate(rowAggregators, result);
- Tuple nextResult = resultIterator.peek();
- if (nextResult == null || !TupleUtil.equals(result, nextResult, tempPtr)) {
- break;
- }
- result = resultIterator.next();
- }
-
- byte[] value = aggregators.toBytes(rowAggregators);
- result.getKey(tempPtr);
- return new SingleKeyValueTuple(KeyValueUtil.newKeyValue(tempPtr, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
- }
-
- @Override
- public void close() throws SQLException {
- resultIterator.close();
+public class GroupedAggregatingResultIterator extends BaseGroupedAggregatingResultIterator {
+
+ public GroupedAggregatingResultIterator(PeekingResultIterator resultIterator, Aggregators aggregators) {
+ super(resultIterator, aggregators);
}
-
+
@Override
- public void aggregate(Tuple result) {
- Aggregator[] rowAggregators = aggregators.getAggregators();
- aggregators.reset(rowAggregators);
- aggregators.aggregate(rowAggregators, result);
+ protected ImmutableBytesWritable getGroupingKey(Tuple tuple, ImmutableBytesWritable ptr) throws SQLException {
+ tuple.getKey(ptr);
+ return ptr;
}
@Override
- public void explain(List<String> planSteps) {
- resultIterator.explain(planSteps);
+ protected Tuple wrapKeyValueAsResult(KeyValue keyValue) throws SQLException {
+ return new SingleKeyValueTuple(keyValue);
}
@Override
public String toString() {
- return "GroupedAggregatingResultIterator [tempPtr=" + tempPtr
- + ", resultIterator=" + resultIterator + ", aggregators="
- + aggregators + "]";
+ return "GroupedAggregatingResultIterator [resultIterator="
+ + resultIterator + ", aggregators=" + aggregators + "]";
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
index a7f390f..971b1a5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
@@ -26,6 +26,10 @@ import org.apache.phoenix.schema.tuple.Tuple;
abstract public class LookAheadResultIterator implements PeekingResultIterator {
public static LookAheadResultIterator wrap(final ResultIterator iterator) {
+ if (iterator instanceof LookAheadResultIterator) {
+ return (LookAheadResultIterator) iterator;
+ }
+
return new LookAheadResultIterator() {
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java
deleted file mode 100644
index e0d9336..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.join;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.phoenix.compile.JoinCompiler.ProjectedPTableWrapper;
-import org.apache.phoenix.expression.Expression;
-import org.apache.phoenix.expression.ExpressionType;
-import org.apache.phoenix.schema.KeyValueSchema;
-import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
-import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.ValueBitSet;
-import org.apache.phoenix.schema.tuple.BaseTuple;
-import org.apache.phoenix.schema.tuple.Tuple;
-import org.apache.phoenix.util.KeyValueUtil;
-import org.apache.phoenix.util.SchemaUtil;
-
-public class TupleProjector {
- public static final byte[] VALUE_COLUMN_FAMILY = Bytes.toBytes("_v");
- public static final byte[] VALUE_COLUMN_QUALIFIER = new byte[0];
-
- private static final String SCAN_PROJECTOR = "scanProjector";
-
- private final KeyValueSchema schema;
- private final Expression[] expressions;
- private ValueBitSet valueSet;
- private final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-
- public TupleProjector(ProjectedPTableWrapper projected) {
- List<PColumn> columns = projected.getTable().getColumns();
- expressions = new Expression[columns.size() - projected.getTable().getPKColumns().size()];
- // we do not count minNullableIndex for we might do later merge.
- KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
- int i = 0;
- for (PColumn column : projected.getTable().getColumns()) {
- if (!SchemaUtil.isPKColumn(column)) {
- builder.addField(column);
- expressions[i++] = projected.getSourceExpression(column);
- }
- }
- schema = builder.build();
- valueSet = ValueBitSet.newInstance(schema);
- }
-
- public TupleProjector(KeyValueSchema schema, Expression[] expressions) {
- this.schema = schema;
- this.expressions = expressions;
- this.valueSet = ValueBitSet.newInstance(schema);
- }
-
- public void setValueBitSet(ValueBitSet bitSet) {
- this.valueSet = bitSet;
- }
-
- public static void serializeProjectorIntoScan(Scan scan, TupleProjector projector) {
- ByteArrayOutputStream stream = new ByteArrayOutputStream();
- try {
- DataOutputStream output = new DataOutputStream(stream);
- projector.schema.write(output);
- int count = projector.expressions.length;
- WritableUtils.writeVInt(output, count);
- for (int i = 0; i < count; i++) {
- WritableUtils.writeVInt(output, ExpressionType.valueOf(projector.expressions[i]).ordinal());
- projector.expressions[i].write(output);
- }
- scan.setAttribute(SCAN_PROJECTOR, stream.toByteArray());
- } catch (IOException e) {
- throw new RuntimeException(e);
- } finally {
- try {
- stream.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- }
-
- public static TupleProjector deserializeProjectorFromScan(Scan scan) {
- byte[] proj = scan.getAttribute(SCAN_PROJECTOR);
- if (proj == null) {
- return null;
- }
- ByteArrayInputStream stream = new ByteArrayInputStream(proj);
- try {
- DataInputStream input = new DataInputStream(stream);
- KeyValueSchema schema = new KeyValueSchema();
- schema.readFields(input);
- int count = WritableUtils.readVInt(input);
- Expression[] expressions = new Expression[count];
- for (int i = 0; i < count; i++) {
- int ordinal = WritableUtils.readVInt(input);
- expressions[i] = ExpressionType.values()[ordinal].newInstance();
- expressions[i].readFields(input);
- }
- return new TupleProjector(schema, expressions);
- } catch (IOException e) {
- throw new RuntimeException(e);
- } finally {
- try {
- stream.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- public static class ProjectedValueTuple extends BaseTuple {
- private ImmutableBytesWritable keyPtr = new ImmutableBytesWritable();
- private long timestamp;
- private byte[] projectedValue;
- private int bitSetLen;
- private KeyValue keyValue;
-
- private ProjectedValueTuple(byte[] keyBuffer, int keyOffset, int keyLength, long timestamp, byte[] projectedValue, int bitSetLen) {
- this.keyPtr.set(keyBuffer, keyOffset, keyLength);
- this.timestamp = timestamp;
- this.projectedValue = projectedValue;
- this.bitSetLen = bitSetLen;
- }
-
- public ImmutableBytesWritable getKeyPtr() {
- return keyPtr;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
- public byte[] getProjectedValue() {
- return projectedValue;
- }
-
- public int getBitSetLength() {
- return bitSetLen;
- }
-
- @Override
- public void getKey(ImmutableBytesWritable ptr) {
- ptr.set(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength());
- }
-
- @Override
- public KeyValue getValue(int index) {
- if (index != 0) {
- throw new IndexOutOfBoundsException(Integer.toString(index));
- }
- return getValue(VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER);
- }
-
- @Override
- public KeyValue getValue(byte[] family, byte[] qualifier) {
- if (keyValue == null) {
- keyValue = KeyValueUtil.newKeyValue(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(),
- VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, timestamp, projectedValue, 0, projectedValue.length);
- }
- return keyValue;
- }
-
- @Override
- public boolean getValue(byte[] family, byte[] qualifier,
- ImmutableBytesWritable ptr) {
- ptr.set(projectedValue);
- return true;
- }
-
- @Override
- public boolean isImmutable() {
- return true;
- }
-
- @Override
- public int size() {
- return 1;
- }
- }
-
- public ProjectedValueTuple projectResults(Tuple tuple) {
- byte[] bytesValue = schema.toBytes(tuple, getExpressions(), valueSet, ptr);
- Cell base = tuple.getValue(0);
- return new ProjectedValueTuple(base.getRowArray(), base.getRowOffset(), base.getRowLength(), base.getTimestamp(), bytesValue, valueSet.getEstimatedLength());
- }
-
- public static void decodeProjectedValue(Tuple tuple, ImmutableBytesWritable ptr) throws IOException {
- boolean b = tuple.getValue(VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, ptr);
- if (!b)
- throw new IOException("Trying to decode a non-projected value.");
- }
-
- public static ProjectedValueTuple mergeProjectedValue(ProjectedValueTuple dest, KeyValueSchema destSchema, ValueBitSet destBitSet,
- Tuple src, KeyValueSchema srcSchema, ValueBitSet srcBitSet, int offset) throws IOException {
- ImmutableBytesWritable destValue = new ImmutableBytesWritable(dest.getProjectedValue());
- destBitSet.clear();
- destBitSet.or(destValue);
- int origDestBitSetLen = dest.getBitSetLength();
- ImmutableBytesWritable srcValue = new ImmutableBytesWritable();
- decodeProjectedValue(src, srcValue);
- srcBitSet.clear();
- srcBitSet.or(srcValue);
- int origSrcBitSetLen = srcBitSet.getEstimatedLength();
- for (int i = 0; i < srcBitSet.getMaxSetBit(); i++) {
- if (srcBitSet.get(i)) {
- destBitSet.set(offset + i);
- }
- }
- int destBitSetLen = destBitSet.getEstimatedLength();
- byte[] merged = new byte[destValue.getLength() - origDestBitSetLen + srcValue.getLength() - origSrcBitSetLen + destBitSetLen];
- int o = Bytes.putBytes(merged, 0, destValue.get(), destValue.getOffset(), destValue.getLength() - origDestBitSetLen);
- o = Bytes.putBytes(merged, o, srcValue.get(), srcValue.getOffset(), srcValue.getLength() - origSrcBitSetLen);
- destBitSet.toBytes(merged, o);
- ImmutableBytesWritable keyPtr = dest.getKeyPtr();
- return new ProjectedValueTuple(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(), dest.getTimestamp(), merged, destBitSetLen);
- }
-
- public KeyValueSchema getSchema() {
- return schema;
- }
-
- public Expression[] getExpressions() {
- return expressions;
- }
-
- public ValueBitSet getValueBitSet() {
- return valueSet;
- }
-
- @Override
- public String toString() {
- return "TUPLE-PROJECTOR {" + Arrays.toString(expressions) + " ==> " + schema.toString() + "}";
- }
-}
-
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 6a68df3..f027ab3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -102,6 +102,7 @@ public class QueryOptimizer {
if (!useIndexes
|| select.isJoin()
|| dataPlan.getContext().getResolver().getTables().size() > 1
+ || select.getInnerSelectStatement() != null
|| (dataPlan.getContext().getScanRanges().isPointLookup() && stopAtBestPlan)) {
return Collections.singletonList(dataPlan);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 4f5e7ae..6f8339e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -630,8 +630,8 @@ public class ParseNodeFactory {
statement.hasSequence());
}
- public SelectStatement select(SelectStatement statement, List<AliasedNode> select, ParseNode where, List<ParseNode> groupBy, boolean isAggregate) {
- return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), select, where, groupBy,
+ public SelectStatement select(SelectStatement statement, boolean isDistinct, List<AliasedNode> select, ParseNode where, List<ParseNode> groupBy, boolean isAggregate) {
+ return select(statement.getFrom(), statement.getHint(), isDistinct, select, where, groupBy,
statement.getHaving(), statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), isAggregate,
statement.hasSequence());
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
index 6cee588..e7302dc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
@@ -192,4 +192,11 @@ public class SelectStatement implements FilterableStatement {
public boolean isJoin() {
return fromTable.size() > 1 || (fromTable.size() > 0 && fromTable.get(0) instanceof JoinTableNode);
}
+
+ public SelectStatement getInnerSelectStatement() {
+ if (fromTable.size() != 1 || !(fromTable.get(0) instanceof DerivedTableNode))
+ return null;
+
+ return ((DerivedTableNode) fromTable.get(0)).getSelect();
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
index 7a00082..d5ccc5c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
@@ -123,7 +123,7 @@ public class ColumnRef {
// use ProjectedColumnExpression, but not sure. The column values from the data
// table should get returned in a single KeyValue in a similar format (using a
// KeyValueSchema).
- if (table.getType() == PTableType.JOIN) {
+ if (table.getType() == PTableType.JOIN || table.getType() == PTableType.SUBQUERY) {
return new ProjectedColumnExpression(column, table, column.getName().getString());
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 54121cb..7ed949a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.KeyValueColumnExpression;
import org.apache.phoenix.expression.RowKeyColumnExpression;
@@ -53,7 +54,6 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.join.TupleProjector;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
import org.apache.phoenix.schema.ColumnNotFoundException;
[2/3] git commit: PHOENIX-944 Support derived tables in FROM clause
that needs extra steps of client-side aggregation or other processing
Posted by ma...@apache.org.
PHOENIX-944 Support derived tables in FROM clause that needs extra steps of client-side aggregation or other processing
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f78a3d70
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f78a3d70
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f78a3d70
Branch: refs/heads/master
Commit: f78a3d70941ab2060bb98efc96c91142e7db6767
Parents: cd1acba
Author: maryannxue <ma...@apache.org>
Authored: Wed Oct 22 13:48:35 2014 -0400
Committer: maryannxue <ma...@apache.org>
Committed: Thu Oct 23 23:08:11 2014 -0400
----------------------------------------------------------------------
.../apache/phoenix/end2end/DerivedTableIT.java | 282 +++++++++++++------
.../org/apache/phoenix/end2end/SubqueryIT.java | 12 +
.../apache/phoenix/compile/FromCompiler.java | 27 +-
.../apache/phoenix/compile/GroupByCompiler.java | 5 +-
.../apache/phoenix/compile/JoinCompiler.java | 2 +-
.../apache/phoenix/compile/OrderByCompiler.java | 2 +-
.../apache/phoenix/compile/QueryCompiler.java | 58 +++-
.../phoenix/compile/SubqueryRewriter.java | 10 +-
.../TrackOrderPreservingExpressionCompiler.java | 27 +-
.../apache/phoenix/compile/WhereCompiler.java | 33 +--
.../GroupedAggregateRegionObserver.java | 2 +-
.../coprocessor/HashJoinRegionScanner.java | 4 +-
.../phoenix/coprocessor/ScanRegionObserver.java | 3 +-
.../UngroupedAggregateRegionObserver.java | 2 +-
.../phoenix/execute/ClientAggregatePlan.java | 229 +++++++++++++++
.../phoenix/execute/ClientProcessingPlan.java | 82 ++++++
.../apache/phoenix/execute/ClientScanPlan.java | 92 ++++++
.../apache/phoenix/execute/HashJoinPlan.java | 24 +-
.../phoenix/execute/TupleProjectionPlan.java | 49 +---
.../apache/phoenix/execute/TupleProjector.java | 276 ++++++++++++++++++
.../expression/ProjectedColumnExpression.java | 2 +-
.../DistinctValueClientAggregator.java | 7 +-
.../BaseGroupedAggregatingResultIterator.java | 105 +++++++
.../GroupedAggregatingResultIterator.java | 67 +----
.../iterate/LookAheadResultIterator.java | 4 +
.../org/apache/phoenix/join/TupleProjector.java | 260 -----------------
.../apache/phoenix/optimize/QueryOptimizer.java | 1 +
.../apache/phoenix/parse/ParseNodeFactory.java | 4 +-
.../apache/phoenix/parse/SelectStatement.java | 7 +
.../org/apache/phoenix/schema/ColumnRef.java | 2 +-
.../java/org/apache/phoenix/util/IndexUtil.java | 2 +-
31 files changed, 1183 insertions(+), 499 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java
index 8a80764..8ef542a 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DerivedTableIT.java
@@ -35,19 +35,19 @@ import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import java.sql.Array;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
-import java.sql.SQLFeatureNotSupportedException;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -61,33 +61,65 @@ import com.google.common.collect.Lists;
@RunWith(Parameterized.class)
public class DerivedTableIT extends BaseClientManagedTimeIT {
private static final String tenantId = getOrganizationId();
- private static final String MSG = "Complex nested queries not supported.";
private long ts;
- private String indexDDL;
+ private String[] indexDDL;
+ private String[] plans;
- public DerivedTableIT(String indexDDL) {
+ public DerivedTableIT(String[] indexDDL, String[] plans) {
this.indexDDL = indexDDL;
+ this.plans = plans;
}
@Before
public void initTable() throws Exception {
ts = nextTimestamp();
initATableValues(tenantId, getDefaultSplits(tenantId), null, ts);
- if (indexDDL != null && indexDDL.length() > 0) {
+ if (indexDDL != null && indexDDL.length > 0) {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts));
Connection conn = DriverManager.getConnection(getUrl(), props);
- conn.createStatement().execute(indexDDL);
+ for (String ddl : indexDDL) {
+ conn.createStatement().execute(ddl);
+ }
}
}
@Parameters(name="{0}")
public static Collection<Object> data() {
List<Object> testCases = Lists.newArrayList();
- testCases.add(new String[] { "CREATE INDEX ATABLE_DERIVED_IDX ON aTable (a_byte) INCLUDE ("
- + " A_STRING, " + " B_STRING)" });
- testCases.add(new String[] { "" });
+ testCases.add(new String[][] {
+ {
+ "CREATE INDEX ATABLE_DERIVED_IDX ON aTable (a_byte) INCLUDE (A_STRING, B_STRING)"
+ }, {
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE_DERIVED_IDX\n" +
+ " SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING, B_STRING]\n" +
+ "CLIENT MERGE SORT\n" +
+ "CLIENT SORTED BY [B_STRING]\n" +
+ "CLIENT SORTED BY [A]\n" +
+ "CLIENT AGGREGATE INTO DISTINCT ROWS BY [A]\n" +
+ "CLIENT SORTED BY [A DESC]",
+
+ "CLIENT PARALLEL 1-WAY FULL SCAN OVER ATABLE_DERIVED_IDX\n" +
+ " SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING, B_STRING]\n" +
+ "CLIENT MERGE SORT\n" +
+ "CLIENT AGGREGATE INTO DISTINCT ROWS BY [A]\n" +
+ "CLIENT DISTINCT ON [COLLECTDISTINCT(B)]"}});
+ testCases.add(new String[][] {
+ {}, {
+ "CLIENT PARALLEL 4-WAY FULL SCAN OVER ATABLE\n" +
+ " SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING, B_STRING]\n" +
+ "CLIENT MERGE SORT\n" +
+ "CLIENT SORTED BY [B_STRING]\n" +
+ "CLIENT SORTED BY [A]\n" +
+ "CLIENT AGGREGATE INTO DISTINCT ROWS BY [A]\n" +
+ "CLIENT SORTED BY [A DESC]",
+
+ "CLIENT PARALLEL 4-WAY FULL SCAN OVER ATABLE\n" +
+ " SERVER AGGREGATE INTO DISTINCT ROWS BY [A_STRING, B_STRING]\n" +
+ "CLIENT MERGE SORT\n" +
+ "CLIENT AGGREGATE INTO DISTINCT ROWS BY [A]\n" +
+ "CLIENT DISTINCT ON [COLLECTDISTINCT(B)]"}});
return testCases;
}
@@ -183,21 +215,21 @@ public class DerivedTableIT extends BaseClientManagedTimeIT {
// (limit) where
query = "SELECT t.eid FROM (SELECT entity_id eid, b_string b FROM aTable LIMIT 2) AS t WHERE t.b = '" + C_VALUE + "'";
- try {
- conn.createStatement().executeQuery(query);
- fail("Should have got SQLFeatureNotSupportedException");
- } catch (SQLFeatureNotSupportedException e) {
- assertEquals(MSG, e.getMessage());
- }
+ statement = conn.prepareStatement(query);
+ rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(ROW2,rs.getString(1));
+
+ assertFalse(rs.next());
// (count) where
query = "SELECT t.c FROM (SELECT count(*) c FROM aTable) AS t WHERE t.c > 0";
- try {
- conn.createStatement().executeQuery(query);
- fail("Should have got SQLFeatureNotSupportedException");
- } catch (SQLFeatureNotSupportedException e) {
- assertEquals(MSG, e.getMessage());
- }
+ statement = conn.prepareStatement(query);
+ rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(9,rs.getInt(1));
+
+ assertFalse(rs.next());
} finally {
conn.close();
}
@@ -227,12 +259,78 @@ public class DerivedTableIT extends BaseClientManagedTimeIT {
// (groupby) groupby
query = "SELECT t.c, count(*) FROM (SELECT count(*) c FROM aTable GROUP BY a_string) AS t GROUP BY t.c";
- try {
- conn.createStatement().executeQuery(query);
- fail("Should have got SQLFeatureNotSupportedException");
- } catch (SQLFeatureNotSupportedException e) {
- assertEquals(MSG, e.getMessage());
- }
+ statement = conn.prepareStatement(query);
+ rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(1,rs.getInt(1));
+ assertEquals(1,rs.getInt(2));
+ assertTrue (rs.next());
+ assertEquals(4,rs.getInt(1));
+ assertEquals(2,rs.getInt(2));
+
+ assertFalse(rs.next());
+
+ // (groupby) groupby orderby
+ query = "SELECT t.c, count(*) FROM (SELECT count(*) c FROM aTable GROUP BY a_string) AS t GROUP BY t.c ORDER BY count(*) DESC";
+ statement = conn.prepareStatement(query);
+ rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(4,rs.getInt(1));
+ assertEquals(2,rs.getInt(2));
+ assertTrue (rs.next());
+ assertEquals(1,rs.getInt(1));
+ assertEquals(1,rs.getInt(2));
+
+ assertFalse(rs.next());
+
+ // (groupby a, b orderby b) groupby a orderby a
+ query = "SELECT t.a, COLLECTDISTINCT(t.b) FROM (SELECT b_string b, a_string a FROM aTable GROUP BY a_string, b_string ORDER BY b_string) AS t GROUP BY t.a ORDER BY t.a DESC";
+ statement = conn.prepareStatement(query);
+ rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(C_VALUE,rs.getString(1));
+ String[] b = new String[1];
+ b[0] = E_VALUE;
+ Array array = conn.createArrayOf("VARCHAR", b);
+ assertEquals(array,rs.getArray(2));
+ assertTrue (rs.next());
+ assertEquals(B_VALUE,rs.getString(1));
+ b = new String[3];
+ b[0] = B_VALUE;
+ b[1] = C_VALUE;
+ b[2] = E_VALUE;
+ array = conn.createArrayOf("VARCHAR", b);
+ assertEquals(array,rs.getArray(2));
+ assertTrue (rs.next());
+ assertEquals(A_VALUE,rs.getString(1));
+ assertEquals(array,rs.getArray(2));
+
+ assertFalse(rs.next());
+
+ rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+ assertEquals(plans[0], QueryUtil.getExplainPlan(rs));
+
+ // distinct b (groupby b, a) groupby a
+ query = "SELECT DISTINCT COLLECTDISTINCT(t.b) FROM (SELECT b_string b, a_string a FROM aTable GROUP BY b_string, a_string) AS t GROUP BY t.a";
+ statement = conn.prepareStatement(query);
+ rs = statement.executeQuery();
+ assertTrue (rs.next());
+ b = new String[1];
+ b[0] = E_VALUE;
+ array = conn.createArrayOf("VARCHAR", b);
+ assertEquals(array,rs.getArray(1));
+ assertTrue (rs.next());
+ b = new String[3];
+ b[0] = B_VALUE;
+ b[1] = C_VALUE;
+ b[2] = E_VALUE;
+ array = conn.createArrayOf("VARCHAR", b);
+ assertEquals(array,rs.getArray(1));
+
+ assertFalse(rs.next());
+
+ rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+ assertEquals(plans[1], QueryUtil.getExplainPlan(rs));
} finally {
conn.close();
}
@@ -321,13 +419,15 @@ public class DerivedTableIT extends BaseClientManagedTimeIT {
assertFalse(rs.next());
// (limit) orderby
- query = "SELECT t.eid FROM (SELECT entity_id eid, b_string b FROM aTable LIMIT 2) AS t ORDER BY t.b, t.eid";
- try {
- conn.createStatement().executeQuery(query);
- fail("Should have got SQLFeatureNotSupportedException");
- } catch (SQLFeatureNotSupportedException e) {
- assertEquals(MSG, e.getMessage());
- }
+ query = "SELECT t.eid FROM (SELECT entity_id eid, b_string b FROM aTable LIMIT 2) AS t ORDER BY t.b DESC, t.eid";
+ statement = conn.prepareStatement(query);
+ rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(ROW2,rs.getString(1));
+ assertTrue (rs.next());
+ assertEquals(ROW1,rs.getString(1));
+
+ assertFalse(rs.next());
} finally {
conn.close();
}
@@ -386,15 +486,16 @@ public class DerivedTableIT extends BaseClientManagedTimeIT {
// limit ? limit ?
query = "SELECT t.eid FROM (SELECT entity_id eid FROM aTable LIMIT ?) AS t LIMIT ?";
- try {
- statement = conn.prepareStatement(query);
- statement.setInt(1, 4);
- statement.setInt(2, 2);
- statement.executeQuery();
- fail("Should have got SQLFeatureNotSupportedException");
- } catch (SQLFeatureNotSupportedException e) {
- assertEquals(MSG, e.getMessage());
- }
+ statement = conn.prepareStatement(query);
+ statement.setInt(1, 4);
+ statement.setInt(2, 2);
+ rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(ROW1,rs.getString(1));
+ assertTrue (rs.next());
+ assertEquals(ROW2,rs.getString(1));
+
+ assertFalse(rs.next());
// (groupby orderby) limit
query = "SELECT a, s FROM (SELECT a_string a, sum(a_byte) s FROM aTable GROUP BY a_string ORDER BY sum(a_byte)) LIMIT 2";
@@ -466,30 +567,51 @@ public class DerivedTableIT extends BaseClientManagedTimeIT {
// distinct (distinct)
query = "SELECT DISTINCT t.a FROM (SELECT DISTINCT a_string a, b_string b FROM aTable) AS t";
- try {
- conn.createStatement().executeQuery(query);
- fail("Should have got SQLFeatureNotSupportedException");
- } catch (SQLFeatureNotSupportedException e) {
- assertEquals(MSG, e.getMessage());
- }
+ statement = conn.prepareStatement(query);
+ rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(A_VALUE,rs.getString(1));
+ assertTrue (rs.next());
+ assertEquals(B_VALUE,rs.getString(1));
+ assertTrue (rs.next());
+ assertEquals(C_VALUE,rs.getString(1));
+
+ assertFalse(rs.next());
// distinct (groupby)
query = "SELECT distinct t.c FROM (SELECT count(*) c FROM aTable GROUP BY a_string) AS t";
- try {
- conn.createStatement().executeQuery(query);
- fail("Should have got SQLFeatureNotSupportedException");
- } catch (SQLFeatureNotSupportedException e) {
- assertEquals(MSG, e.getMessage());
- }
+ statement = conn.prepareStatement(query);
+ rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(1,rs.getInt(1));
+ assertTrue (rs.next());
+ assertEquals(4,rs.getInt(1));
+
+ assertFalse(rs.next());
+
+ // distinct (groupby) orderby
+ query = "SELECT distinct t.c FROM (SELECT count(*) c FROM aTable GROUP BY a_string) AS t ORDER BY t.c DESC";
+ statement = conn.prepareStatement(query);
+ rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(4,rs.getInt(1));
+ assertTrue (rs.next());
+ assertEquals(1,rs.getInt(1));
+
+ assertFalse(rs.next());
// distinct (limit)
query = "SELECT DISTINCT t.a, t.b FROM (SELECT a_string a, b_string b FROM aTable LIMIT 2) AS t";
- try {
- conn.createStatement().executeQuery(query);
- fail("Should have got SQLFeatureNotSupportedException");
- } catch (SQLFeatureNotSupportedException e) {
- assertEquals(MSG, e.getMessage());
- }
+ statement = conn.prepareStatement(query);
+ rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(A_VALUE,rs.getString(1));
+ assertEquals(B_VALUE,rs.getString(2));
+ assertTrue (rs.next());
+ assertEquals(A_VALUE,rs.getString(1));
+ assertEquals(C_VALUE,rs.getString(2));
+
+ assertFalse(rs.next());
} finally {
conn.close();
}
@@ -522,30 +644,30 @@ public class DerivedTableIT extends BaseClientManagedTimeIT {
// count (distinct)
query = "SELECT count(*) FROM (SELECT DISTINCT a_string FROM aTable) AS t";
- try {
- conn.createStatement().executeQuery(query);
- fail("Should have got SQLFeatureNotSupportedException");
- } catch (SQLFeatureNotSupportedException e) {
- assertEquals(MSG, e.getMessage());
- }
+ statement = conn.prepareStatement(query);
+ rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(3,rs.getInt(1));
+
+ assertFalse(rs.next());
// count (groupby)
query = "SELECT count(*) FROM (SELECT count(*) c FROM aTable GROUP BY a_string) AS t";
- try {
- conn.createStatement().executeQuery(query);
- fail("Should have got SQLFeatureNotSupportedException");
- } catch (SQLFeatureNotSupportedException e) {
- assertEquals(MSG, e.getMessage());
- }
+ statement = conn.prepareStatement(query);
+ rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(3,rs.getInt(1));
+
+ assertFalse(rs.next());
// count (limit)
query = "SELECT count(*) FROM (SELECT entity_id FROM aTable LIMIT 2) AS t";
- try {
- conn.createStatement().executeQuery(query);
- fail("Should have got SQLFeatureNotSupportedException");
- } catch (SQLFeatureNotSupportedException e) {
- assertEquals(MSG, e.getMessage());
- }
+ statement = conn.prepareStatement(query);
+ rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(2,rs.getInt(1));
+
+ assertFalse(rs.next());
} finally {
conn.close();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java
index e4b4c8b..4f3ca16 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SubqueryIT.java
@@ -978,6 +978,18 @@ public class SubqueryIT extends BaseHBaseManagedTimeIT {
assertEquals(rs.getString(2), "T6");
assertFalse(rs.next());
+
+ query = "SELECT \"order_id\", name FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o JOIN " + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" WHERE quantity != ANY(SELECT quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + " q WHERE o.\"item_id\" = q.\"item_id\" GROUP BY quantity)";
+ statement = conn.prepareStatement(query);
+ rs = statement.executeQuery();
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000002");
+ assertEquals(rs.getString(2), "T6");
+ assertTrue (rs.next());
+ assertEquals(rs.getString(1), "000000000000004");
+ assertEquals(rs.getString(2), "T6");
+
+ assertFalse(rs.next());
} finally {
conn.close();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index 0fed42a..1627f45 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
+import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.parse.AliasedNode;
import org.apache.phoenix.parse.BindTableNode;
@@ -175,6 +176,23 @@ public class FromCompiler {
SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, statement.getTable(), true);
return visitor;
}
+
+ public static ColumnResolver getResolverForCompiledDerivedTable(PhoenixConnection connection, TableRef tableRef, RowProjector projector)
+ throws SQLException {
+ List<PColumn> projectedColumns = new ArrayList<PColumn>();
+ List<Expression> sourceExpressions = new ArrayList<Expression>();
+ PTable table = tableRef.getTable();
+ for (PColumn column : table.getColumns()) {
+ Expression sourceExpression = projector.getColumnProjector(column.getPosition()).getExpression();
+ PColumnImpl projectedColumn = new PColumnImpl(column.getName(), column.getFamilyName(),
+ sourceExpression.getDataType(), sourceExpression.getMaxLength(), sourceExpression.getScale(), sourceExpression.isNullable(),
+ column.getPosition(), sourceExpression.getSortOrder(), column.getArraySize(), column.getViewConstant(), column.isViewReferenced());
+ projectedColumns.add(projectedColumn);
+ sourceExpressions.add(sourceExpression);
+ }
+ PTable t = PTableImpl.makePTable(table, projectedColumns);
+ return new SingleTableColumnResolver(connection, new TableRef(tableRef.getTableAlias(), t, tableRef.getLowerBoundTimeStamp(), tableRef.hasDynamicCols()));
+ }
public static ColumnResolver getResolverForMutation(DMLStatement statement, PhoenixConnection connection)
throws SQLException {
@@ -215,6 +233,12 @@ public class FromCompiler {
TableRef tableRef = createTableRef(tableNode, updateCacheImmediately);
tableRefs = ImmutableList.of(tableRef);
}
+
+ public SingleTableColumnResolver(PhoenixConnection connection, TableRef tableRef) {
+ super(connection, 0);
+ alias = tableRef.getTableAlias();
+ tableRefs = ImmutableList.of(tableRef);
+ }
@Override
public List<TableRef> getTables() {
@@ -366,8 +390,7 @@ public class FromCompiler {
}
}
- // TODO: unused, but should be used for joins - make private once used
- public static class MultiTableColumnResolver extends BaseColumnResolver implements TableNodeVisitor<Void> {
+ private static class MultiTableColumnResolver extends BaseColumnResolver implements TableNodeVisitor<Void> {
private final ListMultimap<String, TableRef> tableMap;
private final List<TableRef> tables;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java
index dda27aa..a561a47 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java
@@ -30,6 +30,7 @@ import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.CoerceExpression;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.parse.AliasedNode;
@@ -135,7 +136,7 @@ public class GroupByCompiler {
* @throws ColumnNotFoundException if column name could not be resolved
* @throws AmbiguousColumnException if an unaliased column name is ambiguous across multiple tables
*/
- public static GroupBy compile(StatementContext context, SelectStatement statement) throws SQLException {
+ public static GroupBy compile(StatementContext context, SelectStatement statement, TupleProjector tupleProjector) throws SQLException {
List<ParseNode> groupByNodes = statement.getGroupBy();
/**
* Distinct can use an aggregate plan if there's no group by.
@@ -160,7 +161,7 @@ public class GroupByCompiler {
TrackOrderPreservingExpressionCompiler groupByVisitor =
new TrackOrderPreservingExpressionCompiler(context,
GroupBy.EMPTY_GROUP_BY, groupByNodes.size(),
- Ordering.UNORDERED);
+ Ordering.UNORDERED, tupleProjector);
for (ParseNode node : groupByNodes) {
Expression expression = node.accept(groupByVisitor);
if (groupByVisitor.isAggregate()) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index ef053de..140146c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -35,12 +35,12 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.AndExpression;
import org.apache.phoenix.expression.CoerceExpression;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.function.CountAggregateFunction;
import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.join.TupleProjector;
import org.apache.phoenix.parse.AliasedNode;
import org.apache.phoenix.parse.AndParseNode;
import org.apache.phoenix.parse.BindTableNode;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
index 0fd07ec..2629846 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
@@ -85,7 +85,7 @@ public class OrderByCompiler {
// accumulate columns in ORDER BY
TrackOrderPreservingExpressionCompiler visitor =
new TrackOrderPreservingExpressionCompiler(context, groupBy,
- orderByNodes.size(), Ordering.ORDERED);
+ orderByNodes.size(), Ordering.ORDERED, null);
LinkedHashSet<OrderByExpression> orderByExpressions = Sets.newLinkedHashSetWithExpectedSize(orderByNodes.size());
for (OrderByNode node : orderByNodes) {
boolean isAscending = node.isAscending();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index d82ac02..0eafcdb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -34,11 +34,14 @@ import org.apache.phoenix.compile.JoinCompiler.ProjectedPTableWrapper;
import org.apache.phoenix.compile.JoinCompiler.Table;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.execute.AggregatePlan;
+import org.apache.phoenix.execute.ClientAggregatePlan;
+import org.apache.phoenix.execute.ClientScanPlan;
import org.apache.phoenix.execute.HashJoinPlan;
import org.apache.phoenix.execute.HashJoinPlan.HashSubPlan;
import org.apache.phoenix.execute.HashJoinPlan.WhereClauseSubPlan;
import org.apache.phoenix.execute.ScanPlan;
import org.apache.phoenix.execute.TupleProjectionPlan;
+import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.RowValueConstructorExpression;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -47,7 +50,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.join.HashJoinInfo;
-import org.apache.phoenix.join.TupleProjector;
import org.apache.phoenix.parse.HintNode.Hint;
import org.apache.phoenix.parse.JoinTableNode.JoinType;
import org.apache.phoenix.parse.ParseNode;
@@ -59,11 +61,11 @@ import org.apache.phoenix.schema.AmbiguousColumnException;
import org.apache.phoenix.schema.ColumnNotFoundException;
import org.apache.phoenix.schema.PDatum;
import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.util.ScanUtil;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
@@ -349,30 +351,49 @@ public class QueryCompiler {
}
protected QueryPlan compileSingleQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter) throws SQLException{
+ SelectStatement innerSelect = select.getInnerSelectStatement();
+ if (innerSelect == null) {
+ return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, null, null);
+ }
+
+ QueryPlan innerPlan = compileSubquery(innerSelect);
+ TupleProjector tupleProjector = new TupleProjector(innerPlan.getProjector());
+ innerPlan = new TupleProjectionPlan(innerPlan, tupleProjector, null);
+
+ // Replace the original resolver and table with those having compiled type info.
+ TableRef tableRef = context.getResolver().getTables().get(0);
+ ColumnResolver resolver = FromCompiler.getResolverForCompiledDerivedTable(statement.getConnection(), tableRef, innerPlan.getProjector());
+ context.setResolver(resolver);
+ tableRef = resolver.getTables().get(0);
+ context.setCurrentTable(tableRef);
+
+ return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, innerPlan, innerPlan.getOrderBy().getOrderByExpressions().isEmpty() ? tupleProjector : null);
+ }
+
+ protected QueryPlan compileSingleFlatQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter, QueryPlan innerPlan, TupleProjector innerPlanTupleProjector) throws SQLException{
PhoenixConnection connection = statement.getConnection();
ColumnResolver resolver = context.getResolver();
TableRef tableRef = context.getCurrentTable();
PTable table = tableRef.getTable();
- // TODO PHOENIX-944. See DerivedTableIT for a list of unsupported cases.
- if (table.getType() == PTableType.SUBQUERY)
- throw new SQLFeatureNotSupportedException("Complex nested queries not supported.");
-
ParseNode viewWhere = null;
if (table.getViewStatement() != null) {
viewWhere = new SQLParser(table.getViewStatement()).parseQuery().getWhere();
}
Integer limit = LimitCompiler.compile(context, select);
- GroupBy groupBy = GroupByCompiler.compile(context, select);
+ GroupBy groupBy = GroupByCompiler.compile(context, select, innerPlanTupleProjector);
// Optimize the HAVING clause by finding any group by expressions that can be moved
// to the WHERE clause
select = HavingCompiler.rewrite(context, select, groupBy);
Expression having = HavingCompiler.compile(context, select, groupBy);
// Don't pass groupBy when building where clause expression, because we do not want to wrap these
// expressions as group by key expressions since they're pre, not post filtered.
- context.setResolver(FromCompiler.getResolverForQuery(select, connection));
- Set<SubqueryParseNode> subqueries = WhereCompiler.compile(context, select, viewWhere);
+ if (innerPlan == null) {
+ context.setResolver(FromCompiler.getResolverForQuery(select, connection));
+ }
+ Set<SubqueryParseNode> subqueries = Sets.<SubqueryParseNode> newHashSet();
+ Expression where = WhereCompiler.compile(context, select, viewWhere, subqueries);
context.setResolver(resolver); // recover resolver
OrderBy orderBy = OrderByCompiler.compile(context, select, groupBy, limit);
RowProjector projector = ProjectionCompiler.compile(context, select, groupBy, asSubquery ? Collections.<PDatum>emptyList() : targetColumns);
@@ -386,10 +407,14 @@ public class QueryCompiler {
limit = maxRows;
}
}
- ParallelIteratorFactory parallelIteratorFactory = asSubquery ? null : this.parallelIteratorFactory;
- QueryPlan plan = select.isAggregate() || select.isDistinct() ?
- new AggregatePlan(context, select, tableRef, projector, limit, orderBy, parallelIteratorFactory, groupBy, having)
- : new ScanPlan(context, select, tableRef, projector, limit, orderBy, parallelIteratorFactory, allowPageFilter);
+
+ QueryPlan plan = innerPlan;
+ if (plan == null) {
+ ParallelIteratorFactory parallelIteratorFactory = asSubquery ? null : this.parallelIteratorFactory;
+ plan = select.isAggregate() || select.isDistinct() ?
+ new AggregatePlan(context, select, tableRef, projector, limit, orderBy, parallelIteratorFactory, groupBy, having)
+ : new ScanPlan(context, select, tableRef, projector, limit, orderBy, parallelIteratorFactory, allowPageFilter);
+ }
if (!subqueries.isEmpty()) {
int count = subqueries.size();
WhereClauseSubPlan[] subPlans = new WhereClauseSubPlan[count];
@@ -401,6 +426,13 @@ public class QueryCompiler {
plan = HashJoinPlan.create(select, plan, null, subPlans);
}
+ if (innerPlan != null) {
+ plan = select.isAggregate() || select.isDistinct() ?
+ new ClientAggregatePlan(context, select, tableRef, projector, limit, where, orderBy, groupBy, having, plan)
+ : new ClientScanPlan(context, select, tableRef, projector, limit, where, orderBy, plan);
+
+ }
+
return plan;
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
index 4b37259..3e470ce 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubqueryRewriter.java
@@ -238,7 +238,7 @@ public class SubqueryRewriter extends ParseNodeRewriter {
groupbyNodes.add(aliasedNode.getNode());
}
groupbyNodes.addAll(subquery.getGroupBy());
- subquery = NODE_FACTORY.select(subquery, selectNodes, where, groupbyNodes, true);
+ subquery = NODE_FACTORY.select(subquery, subquery.isDistinct(), selectNodes, where, groupbyNodes, true);
}
ParseNode onNode = conditionExtractor.getJoinCondition();
@@ -323,11 +323,11 @@ public class SubqueryRewriter extends ParseNodeRewriter {
}
if (derivedTableAlias == null) {
- subquery = NODE_FACTORY.select(subquery, selectNodes, where, groupbyNodes, true);
+ subquery = NODE_FACTORY.select(subquery, false, selectNodes, where, groupbyNodes, true);
} else {
List<ParseNode> derivedTableGroupBy = Lists.newArrayListWithExpectedSize(subquery.getGroupBy().size() + groupbyNodes.size());
- derivedTableGroupBy.addAll(subquery.getGroupBy());
derivedTableGroupBy.addAll(groupbyNodes);
+ derivedTableGroupBy.addAll(subquery.getGroupBy());
List<AliasedNode> derivedTableSelect = Lists.newArrayListWithExpectedSize(aliasedNodes.size() + selectNodes.size() - 1);
derivedTableSelect.addAll(aliasedNodes);
for (int i = 1; i < selectNodes.size(); i++) {
@@ -338,8 +338,8 @@ public class SubqueryRewriter extends ParseNodeRewriter {
selectNodes.set(i, aliasedNode);
groupbyNodes.set(i - 1, aliasedNode.getNode());
}
- SelectStatement derivedTableStmt = NODE_FACTORY.select(subquery, derivedTableSelect, where, derivedTableGroupBy, true);
- subquery = NODE_FACTORY.select(Collections.singletonList(NODE_FACTORY.derivedTable(derivedTableAlias, derivedTableStmt)), subquery.getHint(), false, selectNodes, null, groupbyNodes, null, Collections.<OrderByNode> emptyList(), null, subquery.getBindCount(), true, subquery.hasSequence());
+ SelectStatement derivedTableStmt = NODE_FACTORY.select(subquery, subquery.isDistinct(), derivedTableSelect, where, derivedTableGroupBy, true);
+ subquery = NODE_FACTORY.select(Collections.singletonList(NODE_FACTORY.derivedTable(derivedTableAlias, derivedTableStmt)), subquery.getHint(), false, selectNodes, null, groupbyNodes, null, Collections.<OrderByNode> emptyList(), null, subquery.getBindCount(), true, false);
}
ParseNode onNode = conditionExtractor.getJoinCondition();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/main/java/org/apache/phoenix/compile/TrackOrderPreservingExpressionCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TrackOrderPreservingExpressionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TrackOrderPreservingExpressionCompiler.java
index 44f9527..9fd6837 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TrackOrderPreservingExpressionCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TrackOrderPreservingExpressionCompiler.java
@@ -24,8 +24,10 @@ import java.util.Comparator;
import java.util.List;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.expression.RowKeyColumnExpression;
import org.apache.phoenix.expression.function.FunctionExpression;
import org.apache.phoenix.expression.function.FunctionExpression.OrderPreserving;
import org.apache.phoenix.parse.CaseParseNode;
@@ -35,10 +37,8 @@ import org.apache.phoenix.parse.MultiplyParseNode;
import org.apache.phoenix.parse.SubtractParseNode;
import org.apache.phoenix.schema.ColumnRef;
import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.SortOrder;
-import org.apache.phoenix.schema.PTable.IndexType;
-import org.apache.phoenix.util.SchemaUtil;
-
import com.google.common.collect.Lists;
/**
@@ -57,12 +57,13 @@ public class TrackOrderPreservingExpressionCompiler extends ExpressionCompiler {
private final List<Entry> entries;
private final Ordering ordering;
private final int positionOffset;
+ private final TupleProjector tupleProjector; // for derived-table query compilation
private OrderPreserving orderPreserving = OrderPreserving.YES;
private ColumnRef columnRef;
private boolean isOrderPreserving = true;
private Boolean isReverse;
- TrackOrderPreservingExpressionCompiler(StatementContext context, GroupBy groupBy, int expectedEntrySize, Ordering ordering) {
+ TrackOrderPreservingExpressionCompiler(StatementContext context, GroupBy groupBy, int expectedEntrySize, Ordering ordering, TupleProjector tupleProjector) {
super(context, groupBy);
PTable table = context.getResolver().getTables().get(0).getTable();
boolean isSalted = table.getBucketNum() != null;
@@ -72,6 +73,7 @@ public class TrackOrderPreservingExpressionCompiler extends ExpressionCompiler {
positionOffset = (isSalted ? 1 : 0) + (isMultiTenant ? 1 : 0) + (isSharedViewIndex ? 1 : 0);
entries = Lists.newArrayListWithExpectedSize(expectedEntrySize);
this.ordering = ordering;
+ this.tupleProjector = tupleProjector;
}
public Boolean isReverse() {
@@ -159,7 +161,7 @@ public class TrackOrderPreservingExpressionCompiler extends ExpressionCompiler {
ColumnRef ref = super.resolveColumn(node);
// If we encounter any non PK column, then we can't aggregate on-the-fly
// because the distinct groups have no correlation to the KV column value
- if (!SchemaUtil.isPKColumn(ref.getColumn())) {
+ if (getColumnPKPosition(ref) < 0) {
orderPreserving = OrderPreserving.NO;
}
@@ -173,6 +175,17 @@ public class TrackOrderPreservingExpressionCompiler extends ExpressionCompiler {
}
return ref;
}
+
+ private int getColumnPKPosition(ColumnRef ref) {
+ if (tupleProjector != null && ref.getTable().getType() == PTableType.SUBQUERY) {
+ Expression expression = tupleProjector.getExpressions()[ref.getColumnPosition()];
+ if (expression instanceof RowKeyColumnExpression) {
+ return ((RowKeyColumnExpression) expression).getPosition();
+ }
+ }
+
+ return ref.getPKSlotPosition();
+ }
public boolean addEntry(Expression expression) {
if (expression instanceof LiteralExpression) {
@@ -206,7 +219,7 @@ public class TrackOrderPreservingExpressionCompiler extends ExpressionCompiler {
return entries;
}
- public static class Entry {
+ public class Entry {
private final Expression expression;
private final ColumnRef columnRef;
private final OrderPreserving orderPreserving;
@@ -222,7 +235,7 @@ public class TrackOrderPreservingExpressionCompiler extends ExpressionCompiler {
}
public int getPkPosition() {
- return columnRef.getPKSlotPosition();
+ return getColumnPKPosition(columnRef);
}
public int getColumnPosition() {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
index 51d0ffc..2c49fed 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
@@ -41,7 +41,6 @@ import org.apache.phoenix.filter.SingleCFCQKeyValueComparisonFilter;
import org.apache.phoenix.filter.SingleCQKeyValueComparisonFilter;
import org.apache.phoenix.parse.ColumnParseNode;
import org.apache.phoenix.parse.FilterableStatement;
-import org.apache.phoenix.parse.HintNode.Hint;
import org.apache.phoenix.parse.ParseNode;
import org.apache.phoenix.parse.ParseNodeFactory;
import org.apache.phoenix.parse.SelectStatement;
@@ -78,8 +77,8 @@ public class WhereCompiler {
private WhereCompiler() {
}
- public static Set<SubqueryParseNode> compile(StatementContext context, FilterableStatement statement) throws SQLException {
- return compile(context, statement, null);
+ public static Expression compile(StatementContext context, FilterableStatement statement) throws SQLException {
+ return compile(context, statement, null, null);
}
/**
@@ -92,8 +91,8 @@ public class WhereCompiler {
* @throws ColumnNotFoundException if column name could not be resolved
* @throws AmbiguousColumnException if an unaliased column name is ambiguous across multiple tables
*/
- public static Set<SubqueryParseNode> compile(StatementContext context, FilterableStatement statement, ParseNode viewWhere) throws SQLException {
- return compile(context, statement, viewWhere, Collections.<Expression>emptyList(), false);
+ public static Expression compile(StatementContext context, FilterableStatement statement, ParseNode viewWhere, Set<SubqueryParseNode> subqueryNodes) throws SQLException {
+ return compile(context, statement, viewWhere, Collections.<Expression>emptyList(), false, subqueryNodes);
}
/**
@@ -106,18 +105,20 @@ public class WhereCompiler {
* @throws ColumnNotFoundException if column name could not be resolved
* @throws AmbiguousColumnException if an unaliased column name is ambiguous across multiple tables
*/
- public static Set<SubqueryParseNode> compile(StatementContext context, FilterableStatement statement, ParseNode viewWhere, List<Expression> dynamicFilters, boolean hashJoinOptimization) throws SQLException {
+ public static Expression compile(StatementContext context, FilterableStatement statement, ParseNode viewWhere, List<Expression> dynamicFilters, boolean hashJoinOptimization, Set<SubqueryParseNode> subqueryNodes) throws SQLException {
ParseNode where = statement.getWhere();
- Set<SubqueryParseNode> subqueryNodes = Sets.<SubqueryParseNode> newHashSet();
- SubqueryParseNodeVisitor subqueryVisitor = new SubqueryParseNodeVisitor(context, subqueryNodes);
- if (where != null) {
- where.accept(subqueryVisitor);
- }
- if (viewWhere != null) {
- viewWhere.accept(subqueryVisitor);
+ if (subqueryNodes != null) { // if the subqueryNodes passed in is null, we assume there will be no sub-queries in the WHERE clause.
+ SubqueryParseNodeVisitor subqueryVisitor = new SubqueryParseNodeVisitor(context, subqueryNodes);
+ if (where != null) {
+ where.accept(subqueryVisitor);
+ }
+ if (viewWhere != null) {
+ viewWhere.accept(subqueryVisitor);
+ }
+ if (!subqueryNodes.isEmpty()) {
+ return null;
+ }
}
- if (!subqueryNodes.isEmpty())
- return subqueryNodes;
Set<Expression> extractedNodes = Sets.<Expression>newHashSet();
WhereExpressionCompiler whereCompiler = new WhereExpressionCompiler(context);
@@ -142,7 +143,7 @@ public class WhereCompiler {
expression = WhereOptimizer.pushKeyExpressionsToScan(context, statement, expression, extractedNodes);
setScanFilter(context, statement, expression, whereCompiler.disambiguateWithFamily, hashJoinOptimization);
- return subqueryNodes;
+ return expression;
}
private static class WhereExpressionCompiler extends ExpressionCompiler {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index 8add152..91a9bdd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.TenantCache;
import org.apache.phoenix.cache.aggcache.SpillableGroupByCache;
+import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.ExpressionType;
import org.apache.phoenix.expression.aggregator.Aggregator;
@@ -60,7 +61,6 @@ import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.join.HashJoinInfo;
-import org.apache.phoenix.join.TupleProjector;
import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PDataType;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index 8e0d42d..724122d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -34,11 +34,11 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.HashCache;
import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.execute.TupleProjector;
+import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.join.HashJoinInfo;
-import org.apache.phoenix.join.TupleProjector;
-import org.apache.phoenix.join.TupleProjector.ProjectedValueTuple;
import org.apache.phoenix.parse.JoinTableNode.JoinType;
import org.apache.phoenix.schema.IllegalDataException;
import org.apache.phoenix.schema.KeyValueSchema;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index 8c72dd5..1672fd7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Set;
import com.google.common.collect.Sets;
+
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.KeyValueColumnExpression;
import org.apache.phoenix.expression.OrderByExpression;
@@ -51,7 +53,6 @@ import org.apache.phoenix.iterate.OrderedResultIterator;
import org.apache.phoenix.iterate.RegionScannerResultIterator;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.join.HashJoinInfo;
-import org.apache.phoenix.join.TupleProjector;
import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.KeyValueSchema;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 710409f..d915948 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.coprocessor.generated.PTableProtos;
import org.apache.phoenix.exception.ValueTypeIncompatibleException;
+import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.ExpressionType;
import org.apache.phoenix.expression.aggregator.Aggregator;
@@ -70,7 +71,6 @@ import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.join.HashJoinInfo;
-import org.apache.phoenix.join.TupleProjector;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.ConstraintViolationException;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
new file mode 100644
index 0000000..a9347e1
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.iterate.AggregatingResultIterator;
+import org.apache.phoenix.iterate.BaseGroupedAggregatingResultIterator;
+import org.apache.phoenix.iterate.DistinctAggregatingResultIterator;
+import org.apache.phoenix.iterate.FilterAggregatingResultIterator;
+import org.apache.phoenix.iterate.FilterResultIterator;
+import org.apache.phoenix.iterate.GroupedAggregatingResultIterator;
+import org.apache.phoenix.iterate.LimitingResultIterator;
+import org.apache.phoenix.iterate.LookAheadResultIterator;
+import org.apache.phoenix.iterate.OrderedAggregatingResultIterator;
+import org.apache.phoenix.iterate.OrderedResultIterator;
+import org.apache.phoenix.iterate.PeekingResultIterator;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.iterate.UngroupedAggregatingResultIterator;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.TupleUtil;
+
+import com.google.common.collect.Lists;
+
+public class ClientAggregatePlan extends ClientProcessingPlan {
+ private final GroupBy groupBy;
+ private final Expression having;
+ private final Aggregators serverAggregators;
+ private final Aggregators clientAggregators;
+
+ public ClientAggregatePlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector,
+ Integer limit, Expression where, OrderBy orderBy, GroupBy groupBy, Expression having, QueryPlan delegate) {
+ super(context, statement, table, projector, limit, where, orderBy, delegate);
+ this.groupBy = groupBy;
+ this.having = having;
+ this.serverAggregators =
+ ServerAggregators.deserialize(context.getScan()
+ .getAttribute(BaseScannerRegionObserver.AGGREGATORS), QueryServicesOptions.withDefaults().getConfiguration());
+ this.clientAggregators = context.getAggregationManager().getAggregators();
+ }
+
+ @Override
+ public ResultIterator iterator() throws SQLException {
+ ResultIterator iterator = delegate.iterator();
+ if (where != null) {
+ iterator = new FilterResultIterator(iterator, where);
+ }
+
+ AggregatingResultIterator aggResultIterator;
+ if (groupBy.isEmpty()) {
+ aggResultIterator = new ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators);
+ aggResultIterator = new UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator), clientAggregators);
+ } else {
+ if (!groupBy.isOrderPreserving()) {
+ int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
+ QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
+ List<Expression> keyExpressions = groupBy.getKeyExpressions();
+ List<OrderByExpression> keyExpressionOrderBy = Lists.newArrayListWithExpectedSize(keyExpressions.size());
+ for (Expression keyExpression : keyExpressions) {
+ keyExpressionOrderBy.add(new OrderByExpression(keyExpression, false, true));
+ }
+ iterator = new OrderedResultIterator(iterator, keyExpressionOrderBy, thresholdBytes, limit, projector.getEstimatedRowByteSize());
+ }
+ aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, groupBy.getExpressions());
+ aggResultIterator = new GroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator), clientAggregators);
+ }
+
+ if (having != null) {
+ aggResultIterator = new FilterAggregatingResultIterator(aggResultIterator, having);
+ }
+
+ if (statement.isDistinct() && statement.isAggregate()) { // Dedup on client if select distinct and aggregation
+ aggResultIterator = new DistinctAggregatingResultIterator(aggResultIterator, getProjector());
+ }
+
+ ResultIterator resultScanner = aggResultIterator;
+ if (orderBy.getOrderByExpressions().isEmpty()) {
+ if (limit != null) {
+ resultScanner = new LimitingResultIterator(aggResultIterator, limit);
+ }
+ } else {
+ int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
+ QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
+ resultScanner = new OrderedAggregatingResultIterator(aggResultIterator, orderBy.getOrderByExpressions(), thresholdBytes, limit);
+ }
+ if (context.getSequenceManager().getSequenceCount() > 0) {
+ resultScanner = new SequenceResultIterator(resultScanner, context.getSequenceManager());
+ }
+
+ return resultScanner;
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ List<String> planSteps = Lists.newArrayList(delegate.getExplainPlan().getPlanSteps());
+ if (where != null) {
+ planSteps.add("CLIENT FILTER BY " + where.toString());
+ }
+ if (!groupBy.isEmpty()) {
+ if (!groupBy.isOrderPreserving()) {
+ planSteps.add("CLIENT SORTED BY " + groupBy.getKeyExpressions().toString());
+ }
+ planSteps.add("CLIENT AGGREGATE INTO DISTINCT ROWS BY " + groupBy.getExpressions().toString());
+ } else {
+ planSteps.add("CLIENT AGGREGATE INTO SINGLE ROW");
+ }
+ if (having != null) {
+ planSteps.add("CLIENT AFTER-AGGREGATION FILTER BY " + having.toString());
+ }
+ if (statement.isDistinct() && statement.isAggregate()) {
+ planSteps.add("CLIENT DISTINCT ON " + projector.toString());
+ }
+ if (orderBy.getOrderByExpressions().isEmpty()) {
+ if (limit != null) {
+ planSteps.add("CLIENT " + limit + " ROW LIMIT");
+ }
+ } else {
+ planSteps.add("CLIENT" + (limit == null ? "" : " TOP " + limit + " ROW" + (limit == 1 ? "" : "S")) + " SORTED BY " + orderBy.getOrderByExpressions().toString());
+ }
+ if (context.getSequenceManager().getSequenceCount() > 0) {
+ int nSequences = context.getSequenceManager().getSequenceCount();
+ planSteps.add("CLIENT RESERVE VALUES FROM " + nSequences + " SEQUENCE" + (nSequences == 1 ? "" : "S"));
+ }
+
+ return new ExplainPlan(planSteps);
+ }
+
+ @Override
+ public GroupBy getGroupBy() {
+ return groupBy;
+ }
+
+ private static class ClientGroupedAggregatingResultIterator extends BaseGroupedAggregatingResultIterator {
+ private final List<Expression> groupByExpressions;
+
+ public ClientGroupedAggregatingResultIterator(PeekingResultIterator iterator, Aggregators aggregators, List<Expression> groupByExpressions) {
+ super(iterator, aggregators);
+ this.groupByExpressions = groupByExpressions;
+ }
+
+ @Override
+ protected ImmutableBytesWritable getGroupingKey(Tuple tuple,
+ ImmutableBytesWritable ptr) throws SQLException {
+ try {
+ ImmutableBytesWritable key = TupleUtil.getConcatenatedValue(tuple, groupByExpressions);
+ ptr.set(key.get(), key.getOffset(), key.getLength());
+ return ptr;
+ } catch (IOException e) {
+ throw new SQLException(e);
+ }
+ }
+
+ @Override
+ protected Tuple wrapKeyValueAsResult(KeyValue keyValue) {
+ return new MultiKeyValueTuple(Collections.<Cell> singletonList(keyValue));
+ }
+
+ @Override
+ public String toString() {
+ return "ClientGroupedAggregatingResultIterator [resultIterator="
+ + resultIterator + ", aggregators=" + aggregators + ", groupByExpressions="
+ + groupByExpressions + "]";
+ }
+ }
+
+ private static class ClientUngroupedAggregatingResultIterator extends BaseGroupedAggregatingResultIterator {
+
+ public ClientUngroupedAggregatingResultIterator(PeekingResultIterator iterator, Aggregators aggregators) {
+ super(iterator, aggregators);
+ }
+
+ @Override
+ protected ImmutableBytesWritable getGroupingKey(Tuple tuple,
+ ImmutableBytesWritable ptr) throws SQLException {
+ tuple.getKey(ptr);
+ return ptr;
+ }
+
+ @Override
+ protected Tuple wrapKeyValueAsResult(KeyValue keyValue)
+ throws SQLException {
+ return new MultiKeyValueTuple(Collections.<Cell> singletonList(keyValue));
+ }
+
+ @Override
+ public String toString() {
+ return "ClientUngroupedAggregatingResultIterator [resultIterator="
+ + resultIterator + ", aggregators=" + aggregators + "]";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
new file mode 100644
index 0000000..8e787b4
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientProcessingPlan.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.schema.TableRef;
+
+/**
+ * Query plan that does where, order-by limit at client side, which is
+ * for derived-table queries that cannot be flattened by SubselectRewriter.
+ */
+public abstract class ClientProcessingPlan extends DelegateQueryPlan {
+ protected final StatementContext context;
+ protected final FilterableStatement statement;
+ protected final TableRef table;
+ protected final RowProjector projector;
+ protected final Integer limit;
+ protected final Expression where;
+ protected final OrderBy orderBy;
+
+ public ClientProcessingPlan(StatementContext context, FilterableStatement statement, TableRef table,
+ RowProjector projector, Integer limit, Expression where, OrderBy orderBy, QueryPlan delegate) {
+ super(delegate);
+ this.context = context;
+ this.statement = statement;
+ this.table = table;
+ this.projector = projector;
+ this.limit = limit;
+ this.where = where;
+ this.orderBy = orderBy;
+ }
+
+ @Override
+ public StatementContext getContext() {
+ return context;
+ }
+
+ @Override
+ public TableRef getTableRef() {
+ return table;
+ }
+
+ @Override
+ public RowProjector getProjector() {
+ return projector;
+ }
+
+ @Override
+ public Integer getLimit() {
+ return limit;
+ }
+
+ @Override
+ public OrderBy getOrderBy() {
+ return orderBy;
+ }
+
+ @Override
+ public FilterableStatement getStatement() {
+ return statement;
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
new file mode 100644
index 0000000..01fbd11
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.iterate.FilterResultIterator;
+import org.apache.phoenix.iterate.LimitingResultIterator;
+import org.apache.phoenix.iterate.OrderedResultIterator;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.SequenceResultIterator;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.TableRef;
+
+import com.google.common.collect.Lists;
+
+public class ClientScanPlan extends ClientProcessingPlan {
+
+ public ClientScanPlan(StatementContext context,
+ FilterableStatement statement, TableRef table,
+ RowProjector projector, Integer limit, Expression where,
+ OrderBy orderBy, QueryPlan delegate) {
+ super(context, statement, table, projector, limit, where, orderBy,
+ delegate);
+ }
+
+ @Override
+ public ResultIterator iterator() throws SQLException {
+ ResultIterator iterator = delegate.iterator();
+ if (where != null) {
+ iterator = new FilterResultIterator(iterator, where);
+ }
+
+ if (!orderBy.getOrderByExpressions().isEmpty()) { // TopN
+ int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
+ QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
+ iterator = new OrderedResultIterator(iterator, orderBy.getOrderByExpressions(), thresholdBytes, limit, projector.getEstimatedRowByteSize());
+ } else if (limit != null) {
+ iterator = new LimitingResultIterator(iterator, limit);
+ }
+
+ if (context.getSequenceManager().getSequenceCount() > 0) {
+ iterator = new SequenceResultIterator(iterator, context.getSequenceManager());
+ }
+
+ return iterator;
+ }
+
+ @Override
+ public ExplainPlan getExplainPlan() throws SQLException {
+ List<String> planSteps = Lists.newArrayList(delegate.getExplainPlan().getPlanSteps());
+ if (where != null) {
+ planSteps.add("CLIENT FILTER BY " + where.toString());
+ }
+ if (!orderBy.getOrderByExpressions().isEmpty()) {
+ planSteps.add("CLIENT" + (limit == null ? "" : " TOP " + limit + " ROW" + (limit == 1 ? "" : "S")) + " SORTED BY " + orderBy.getOrderByExpressions().toString());
+ } else if (limit != null) {
+ planSteps.add("CLIENT " + limit + " ROW LIMIT");
+ }
+ if (context.getSequenceManager().getSequenceCount() > 0) {
+ int nSequences = context.getSequenceManager().getSequenceCount();
+ planSteps.add("CLIENT RESERVE VALUES FROM " + nSequences + " SEQUENCE" + (nSequences == 1 ? "" : "S"));
+ }
+
+ return new ExplainPlan(planSteps);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index fce4245..c6ed0ac 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -51,6 +51,7 @@ import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.InListExpression;
import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.expression.RowValueConstructorExpression;
+import org.apache.phoenix.iterate.FilterResultIterator;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.job.JobManager.JobCallable;
@@ -76,7 +77,7 @@ import com.google.common.collect.Lists;
public class HashJoinPlan extends DelegateQueryPlan {
private static final Log LOG = LogFactory.getLog(HashJoinPlan.class);
- private final FilterableStatement statement;
+ private final SelectStatement statement;
private final HashJoinInfo joinInfo;
private final SubPlan[] subPlans;
private final boolean recompileWhereClause;
@@ -88,14 +89,13 @@ public class HashJoinPlan extends DelegateQueryPlan {
private AtomicLong firstJobEndTime;
private List<Expression> keyRangeExpressions;
- public static HashJoinPlan create(FilterableStatement statement,
+ public static HashJoinPlan create(SelectStatement statement,
QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans) {
- if (plan instanceof BaseQueryPlan)
+ if (!(plan instanceof HashJoinPlan))
return new HashJoinPlan(statement, plan, joinInfo, subPlans, joinInfo == null);
- assert (plan instanceof HashJoinPlan);
HashJoinPlan hashJoinPlan = (HashJoinPlan) plan;
- assert hashJoinPlan.joinInfo == null;
+ assert (hashJoinPlan.joinInfo == null && hashJoinPlan.delegate instanceof BaseQueryPlan);
SubPlan[] mergedSubPlans = new SubPlan[hashJoinPlan.subPlans.length + subPlans.length];
int i = 0;
for (SubPlan subPlan : hashJoinPlan.subPlans) {
@@ -107,7 +107,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
return new HashJoinPlan(statement, hashJoinPlan.delegate, joinInfo, mergedSubPlans, true);
}
- private HashJoinPlan(FilterableStatement statement,
+ private HashJoinPlan(SelectStatement statement,
QueryPlan plan, HashJoinInfo joinInfo, SubPlan[] subPlans, boolean recompileWhereClause) {
super(plan);
this.statement = statement;
@@ -170,6 +170,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
throw firstException;
}
+ Expression postFilter = null;
boolean hasKeyRangeExpressions = keyRangeExpressions != null && !keyRangeExpressions.isEmpty();
if (recompileWhereClause || hasKeyRangeExpressions) {
StatementContext context = delegate.getContext();
@@ -177,10 +178,10 @@ public class HashJoinPlan extends DelegateQueryPlan {
ParseNode viewWhere = table.getViewStatement() == null ? null : new SQLParser(table.getViewStatement()).parseQuery().getWhere();
context.setResolver(FromCompiler.getResolverForQuery((SelectStatement) (delegate.getStatement()), delegate.getContext().getConnection()));
if (recompileWhereClause) {
- WhereCompiler.compile(delegate.getContext(), delegate.getStatement(), viewWhere);
+ postFilter = WhereCompiler.compile(delegate.getContext(), delegate.getStatement(), viewWhere, null);
}
if (hasKeyRangeExpressions) {
- WhereCompiler.compile(delegate.getContext(), delegate.getStatement(), viewWhere, keyRangeExpressions, true);
+ WhereCompiler.compile(delegate.getContext(), delegate.getStatement(), viewWhere, keyRangeExpressions, true, null);
}
}
@@ -189,7 +190,12 @@ public class HashJoinPlan extends DelegateQueryPlan {
HashJoinInfo.serializeHashJoinIntoScan(scan, joinInfo);
}
- return ((BaseQueryPlan) delegate).iterator(dependencies);
+ ResultIterator iterator = joinInfo == null ? delegate.iterator() : ((BaseQueryPlan) delegate).iterator(dependencies);
+ if (statement.getInnerSelectStatement() != null && postFilter != null) {
+ iterator = new FilterResultIterator(iterator, postFilter);
+ }
+
+ return iterator;
}
private Expression createKeyRangeExpression(Expression lhsExpression,
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f78a3d70/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
index 410d386..c9cbd15 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
@@ -20,14 +20,12 @@ package org.apache.phoenix.execute;
import java.sql.SQLException;
import java.util.List;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.iterate.DelegateResultIterator;
+import org.apache.phoenix.iterate.FilterResultIterator;
import org.apache.phoenix.iterate.ResultIterator;
-import org.apache.phoenix.join.TupleProjector;
-import org.apache.phoenix.schema.IllegalDataException;
import org.apache.phoenix.schema.tuple.Tuple;
import com.google.common.collect.Lists;
@@ -49,52 +47,33 @@ public class TupleProjectionPlan extends DelegateQueryPlan {
if (postFilter != null) {
planSteps.add("CLIENT FILTER BY " + postFilter.toString());
}
-
+
return new ExplainPlan(planSteps);
}
@Override
public ResultIterator iterator() throws SQLException {
- final ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
-
- return new DelegateResultIterator(delegate.iterator()) {
+ ResultIterator iterator = new DelegateResultIterator(delegate.iterator()) {
@Override
public Tuple next() throws SQLException {
- Tuple tuple = null;
- while (tuple == null) {
- tuple = super.next();
- if (tuple == null) {
- break;
- }
-
- tuple = tupleProjector.projectResults(tuple);
-
- if (postFilter != null) {
- postFilter.reset();
- try {
- if (postFilter.evaluate(tuple, tempPtr)) {
- Boolean b = (Boolean)postFilter.getDataType().toObject(tempPtr);
- if (!b.booleanValue()) {
- tuple = null;
- }
- } else {
- tuple = null;
- }
- } catch (IllegalDataException e) {
- tuple = null;
- }
- }
- }
+ Tuple tuple = super.next();
+ if (tuple == null)
+ return null;
- return tuple;
+ return tupleProjector.projectResults(tuple);
}
@Override
public String toString() {
- return "TupleProjectionResultIterator [projector=" + tupleProjector + ", postFilter="
- + postFilter + "]";
+ return "TupleProjectionResultIterator [projector=" + tupleProjector + "]";
}
};
+
+ if (postFilter != null) {
+ iterator = new FilterResultIterator(iterator, postFilter);
+ }
+
+ return iterator;
}
}
[3/3] git commit: PHOENIX-944 Support derived tables in FROM clause
that needs extra steps of client-side aggregation or other processing
Posted by ma...@apache.org.
PHOENIX-944 Support derived tables in FROM clause that needs extra steps of client-side aggregation or other processing
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/fba06a80
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/fba06a80
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/fba06a80
Branch: refs/heads/master
Commit: fba06a80f660462c3e1aa4ff7c8bac5d640168c0
Parents: f78a3d7
Author: maryannxue <ma...@apache.org>
Authored: Thu Oct 23 20:26:17 2014 -0400
Committer: maryannxue <ma...@apache.org>
Committed: Thu Oct 23 23:08:54 2014 -0400
----------------------------------------------------------------------
.../main/java/org/apache/phoenix/compile/QueryCompiler.java | 4 ++++
.../java/org/apache/phoenix/execute/ClientAggregatePlan.java | 4 +++-
.../org/apache/phoenix/iterate/LookAheadResultIterator.java | 6 +++---
3 files changed, 10 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fba06a80/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 0eafcdb..214330c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -43,6 +43,7 @@ import org.apache.phoenix.execute.ScanPlan;
import org.apache.phoenix.execute.TupleProjectionPlan;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
import org.apache.phoenix.expression.RowValueConstructorExpression;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
@@ -427,6 +428,9 @@ public class QueryCompiler {
}
if (innerPlan != null) {
+ if (LiteralExpression.isTrue(where)) {
+ where = null; // we do not pass "true" as filter
+ }
plan = select.isAggregate() || select.isDistinct() ?
new ClientAggregatePlan(context, select, tableRef, projector, limit, where, orderBy, groupBy, having, plan)
: new ClientScanPlan(context, select, tableRef, projector, limit, where, orderBy, plan);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fba06a80/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
index a9347e1..59aab2d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
@@ -17,6 +17,8 @@
*/
package org.apache.phoenix.execute;
+import static org.apache.phoenix.query.QueryConstants.*;
+
import java.io.IOException;
import java.sql.SQLException;
import java.util.Collections;
@@ -210,7 +212,7 @@ public class ClientAggregatePlan extends ClientProcessingPlan {
@Override
protected ImmutableBytesWritable getGroupingKey(Tuple tuple,
ImmutableBytesWritable ptr) throws SQLException {
- tuple.getKey(ptr);
+ ptr.set(UNGROUPED_AGG_ROW_KEY);
return ptr;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/fba06a80/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
index 971b1a5..3293f65 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
@@ -25,9 +25,9 @@ import org.apache.phoenix.schema.tuple.Tuple;
abstract public class LookAheadResultIterator implements PeekingResultIterator {
- public static LookAheadResultIterator wrap(final ResultIterator iterator) {
- if (iterator instanceof LookAheadResultIterator) {
- return (LookAheadResultIterator) iterator;
+ public static PeekingResultIterator wrap(final ResultIterator iterator) {
+ if (iterator instanceof PeekingResultIterator) {
+ return (PeekingResultIterator) iterator;
}
return new LookAheadResultIterator() {