You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/10/29 01:01:59 UTC
[04/15] PHOENIX-944 Support derived tables in FROM clause that needs
extra steps of client-side aggregation or other processing
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
new file mode 100644
index 0000000..14b488d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.compile.ColumnProjector;
+import org.apache.phoenix.compile.JoinCompiler.ProjectedPTableWrapper;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+import org.apache.phoenix.schema.KeyValueSchema;
+import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.ValueBitSet;
+import org.apache.phoenix.schema.tuple.BaseTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+public class TupleProjector {
+ public static final byte[] VALUE_COLUMN_FAMILY = Bytes.toBytes("_v");
+ public static final byte[] VALUE_COLUMN_QUALIFIER = new byte[0];
+
+ private static final String SCAN_PROJECTOR = "scanProjector";
+
+ private final KeyValueSchema schema;
+ private final Expression[] expressions;
+ private ValueBitSet valueSet;
+ private final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+
+ public TupleProjector(RowProjector rowProjector) {
+ List<? extends ColumnProjector> columnProjectors = rowProjector.getColumnProjectors();
+ int count = columnProjectors.size();
+ KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
+ expressions = new Expression[count];
+ for (int i = 0; i < count; i++) {
+ Expression expression = columnProjectors.get(i).getExpression();
+ builder.addField(expression);
+ expressions[i] = expression;
+ }
+ schema = builder.build();
+ valueSet = ValueBitSet.newInstance(schema);
+ }
+
+ public TupleProjector(ProjectedPTableWrapper projected) {
+ List<PColumn> columns = projected.getTable().getColumns();
+ expressions = new Expression[columns.size() - projected.getTable().getPKColumns().size()];
+ // we do not count minNullableIndex for we might do later merge.
+ KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
+ int i = 0;
+ for (PColumn column : projected.getTable().getColumns()) {
+ if (!SchemaUtil.isPKColumn(column)) {
+ builder.addField(column);
+ expressions[i++] = projected.getSourceExpression(column);
+ }
+ }
+ schema = builder.build();
+ valueSet = ValueBitSet.newInstance(schema);
+ }
+
+ private TupleProjector(KeyValueSchema schema, Expression[] expressions) {
+ this.schema = schema;
+ this.expressions = expressions;
+ this.valueSet = ValueBitSet.newInstance(schema);
+ }
+
+ public void setValueBitSet(ValueBitSet bitSet) {
+ this.valueSet = bitSet;
+ }
+
+ public static void serializeProjectorIntoScan(Scan scan, TupleProjector projector) {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ try {
+ DataOutputStream output = new DataOutputStream(stream);
+ projector.schema.write(output);
+ int count = projector.expressions.length;
+ WritableUtils.writeVInt(output, count);
+ for (int i = 0; i < count; i++) {
+ WritableUtils.writeVInt(output, ExpressionType.valueOf(projector.expressions[i]).ordinal());
+ projector.expressions[i].write(output);
+ }
+ scan.setAttribute(SCAN_PROJECTOR, stream.toByteArray());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ public static TupleProjector deserializeProjectorFromScan(Scan scan) {
+ byte[] proj = scan.getAttribute(SCAN_PROJECTOR);
+ if (proj == null) {
+ return null;
+ }
+ ByteArrayInputStream stream = new ByteArrayInputStream(proj);
+ try {
+ DataInputStream input = new DataInputStream(stream);
+ KeyValueSchema schema = new KeyValueSchema();
+ schema.readFields(input);
+ int count = WritableUtils.readVInt(input);
+ Expression[] expressions = new Expression[count];
+ for (int i = 0; i < count; i++) {
+ int ordinal = WritableUtils.readVInt(input);
+ expressions[i] = ExpressionType.values()[ordinal].newInstance();
+ expressions[i].readFields(input);
+ }
+ return new TupleProjector(schema, expressions);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static class ProjectedValueTuple extends BaseTuple {
+ private ImmutableBytesWritable keyPtr = new ImmutableBytesWritable();
+ private long timestamp;
+ private byte[] projectedValue;
+ private int bitSetLen;
+ private KeyValue keyValue;
+
+ private ProjectedValueTuple(byte[] keyBuffer, int keyOffset, int keyLength, long timestamp, byte[] projectedValue, int bitSetLen) {
+ this.keyPtr.set(keyBuffer, keyOffset, keyLength);
+ this.timestamp = timestamp;
+ this.projectedValue = projectedValue;
+ this.bitSetLen = bitSetLen;
+ }
+
+ public ImmutableBytesWritable getKeyPtr() {
+ return keyPtr;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public byte[] getProjectedValue() {
+ return projectedValue;
+ }
+
+ public int getBitSetLength() {
+ return bitSetLen;
+ }
+
+ @Override
+ public void getKey(ImmutableBytesWritable ptr) {
+ ptr.set(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength());
+ }
+
+ @Override
+ public KeyValue getValue(int index) {
+ if (index != 0) {
+ throw new IndexOutOfBoundsException(Integer.toString(index));
+ }
+ return getValue(VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER);
+ }
+
+ @Override
+ public KeyValue getValue(byte[] family, byte[] qualifier) {
+ if (keyValue == null) {
+ keyValue = KeyValueUtil.newKeyValue(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(),
+ VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, timestamp, projectedValue, 0, projectedValue.length);
+ }
+ return keyValue;
+ }
+
+ @Override
+ public boolean getValue(byte[] family, byte[] qualifier,
+ ImmutableBytesWritable ptr) {
+ ptr.set(projectedValue);
+ return true;
+ }
+
+ @Override
+ public boolean isImmutable() {
+ return true;
+ }
+
+ @Override
+ public int size() {
+ return 1;
+ }
+ }
+
+ public ProjectedValueTuple projectResults(Tuple tuple) {
+ byte[] bytesValue = schema.toBytes(tuple, getExpressions(), valueSet, ptr);
+ KeyValue base = tuple.getValue(0);
+ return new ProjectedValueTuple(base.getBuffer(), base.getRowOffset(), base.getRowLength(), base.getTimestamp(), bytesValue, valueSet.getEstimatedLength());
+ }
+
+ public static void decodeProjectedValue(Tuple tuple, ImmutableBytesWritable ptr) throws IOException {
+ boolean b = tuple.getValue(VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, ptr);
+ if (!b)
+ throw new IOException("Trying to decode a non-projected value.");
+ }
+
+ public static ProjectedValueTuple mergeProjectedValue(ProjectedValueTuple dest, KeyValueSchema destSchema, ValueBitSet destBitSet,
+ Tuple src, KeyValueSchema srcSchema, ValueBitSet srcBitSet, int offset) throws IOException {
+ ImmutableBytesWritable destValue = new ImmutableBytesWritable(dest.getProjectedValue());
+ destBitSet.clear();
+ destBitSet.or(destValue);
+ int origDestBitSetLen = dest.getBitSetLength();
+ ImmutableBytesWritable srcValue = new ImmutableBytesWritable();
+ decodeProjectedValue(src, srcValue);
+ srcBitSet.clear();
+ srcBitSet.or(srcValue);
+ int origSrcBitSetLen = srcBitSet.getEstimatedLength();
+ for (int i = 0; i < srcBitSet.getMaxSetBit(); i++) {
+ if (srcBitSet.get(i)) {
+ destBitSet.set(offset + i);
+ }
+ }
+ int destBitSetLen = destBitSet.getEstimatedLength();
+ byte[] merged = new byte[destValue.getLength() - origDestBitSetLen + srcValue.getLength() - origSrcBitSetLen + destBitSetLen];
+ int o = Bytes.putBytes(merged, 0, destValue.get(), destValue.getOffset(), destValue.getLength() - origDestBitSetLen);
+ o = Bytes.putBytes(merged, o, srcValue.get(), srcValue.getOffset(), srcValue.getLength() - origSrcBitSetLen);
+ destBitSet.toBytes(merged, o);
+ ImmutableBytesWritable keyPtr = dest.getKeyPtr();
+ return new ProjectedValueTuple(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(), dest.getTimestamp(), merged, destBitSetLen);
+ }
+
+ public KeyValueSchema getSchema() {
+ return schema;
+ }
+
+ public Expression[] getExpressions() {
+ return expressions;
+ }
+
+ public ValueBitSet getValueBitSet() {
+ return valueSet;
+ }
+
+ @Override
+ public String toString() {
+ return "TUPLE-PROJECTOR {" + Arrays.toString(expressions) + " ==> " + schema.toString() + "}";
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
index dcac849..10657e0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
@@ -22,8 +22,8 @@ import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.visitor.ExpressionVisitor;
-import org.apache.phoenix.join.TupleProjector;
import org.apache.phoenix.schema.KeyValueSchema;
import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
import org.apache.phoenix.schema.PColumn;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java
index fefb077..2af99ca 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java
@@ -21,7 +21,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.schema.PArrayDataType;
import org.apache.phoenix.schema.PDataType;
-import org.apache.phoenix.schema.PhoenixArray;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.tuple.Tuple;
@@ -37,15 +36,15 @@ public class DistinctValueClientAggregator extends DistinctValueWithCountClientA
@Override
public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
- if (buffer == null || buffer.length == 0) {
+ if (cachedResult == null) {
Object[] values = new Object[valueVsCount.size()];
int i = 0;
for (ImmutableBytesPtr key : valueVsCount.keySet()) {
values[i++] = valueType.toObject(key, sortOrder);
}
- PhoenixArray array = PArrayDataType.instantiatePhoenixArray(valueType, values);
- buffer = resultType.toBytes(array, sortOrder);
+ cachedResult = PArrayDataType.instantiatePhoenixArray(valueType, values);
}
+ buffer = resultType.toBytes(cachedResult, sortOrder);
ptr.set(buffer);
return true;
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java
new file mode 100644
index 0000000..8fd36b3
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+
+/**
+ *
+ * Base class for result scanners that aggregate the row count value for rows with
+ * duplicate keys. This result scanner assumes that the results of the inner result
+ * scanner are returned in order of grouping keys.
+ *
+ */
+public abstract class BaseGroupedAggregatingResultIterator implements
+ AggregatingResultIterator {
+ private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+ protected final PeekingResultIterator resultIterator;
+ protected final Aggregators aggregators;
+ private ImmutableBytesWritable currentKey;
+ private ImmutableBytesWritable nextKey;
+
+ public BaseGroupedAggregatingResultIterator(
+ PeekingResultIterator resultIterator, Aggregators aggregators) {
+ if (resultIterator == null) throw new NullPointerException();
+ if (aggregators == null) throw new NullPointerException();
+ this.resultIterator = resultIterator;
+ this.aggregators = aggregators;
+ this.currentKey = new ImmutableBytesWritable(UNITIALIZED_KEY_BUFFER);
+ this.nextKey = new ImmutableBytesWritable(UNITIALIZED_KEY_BUFFER);
+ }
+
+ protected abstract ImmutableBytesWritable getGroupingKey(Tuple tuple, ImmutableBytesWritable ptr) throws SQLException;
+ protected abstract Tuple wrapKeyValueAsResult(KeyValue keyValue) throws SQLException;
+
+ @Override
+ public Tuple next() throws SQLException {
+ Tuple result = resultIterator.next();
+ if (result == null) {
+ return null;
+ }
+ if (currentKey.get() == UNITIALIZED_KEY_BUFFER) {
+ getGroupingKey(result, currentKey);
+ }
+ Aggregator[] rowAggregators = aggregators.getAggregators();
+ aggregators.reset(rowAggregators);
+ while (true) {
+ aggregators.aggregate(rowAggregators, result);
+ Tuple nextResult = resultIterator.peek();
+ if (nextResult == null || !currentKey.equals(getGroupingKey(nextResult, nextKey))) {
+ break;
+ }
+ result = resultIterator.next();
+ }
+
+ byte[] value = aggregators.toBytes(rowAggregators);
+ Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(currentKey, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+ currentKey.set(nextKey.get(), nextKey.getOffset(), nextKey.getLength());
+ return tuple;
+ }
+
+ @Override
+ public void close() throws SQLException {
+ resultIterator.close();
+ }
+
+ @Override
+ public void aggregate(Tuple result) {
+ Aggregator[] rowAggregators = aggregators.getAggregators();
+ aggregators.reset(rowAggregators);
+ aggregators.aggregate(rowAggregators, result);
+ }
+
+ @Override
+ public void explain(List<String> planSteps) {
+ resultIterator.explain(planSteps);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java
index 50e1bc2..db08696 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java
@@ -17,19 +17,13 @@
*/
package org.apache.phoenix.iterate;
-import static org.apache.phoenix.query.QueryConstants.*;
-
import java.sql.SQLException;
-import java.util.List;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
-import org.apache.phoenix.expression.aggregator.Aggregator;
import org.apache.phoenix.expression.aggregator.Aggregators;
import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
-import org.apache.phoenix.util.KeyValueUtil;
-import org.apache.phoenix.util.TupleUtil;
@@ -51,54 +45,20 @@ import org.apache.phoenix.util.TupleUtil;
*
* @since 0.1
*/
-public class GroupedAggregatingResultIterator implements AggregatingResultIterator {
- private final ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
- private final PeekingResultIterator resultIterator;
- protected final Aggregators aggregators;
-
- public GroupedAggregatingResultIterator( PeekingResultIterator resultIterator, Aggregators aggregators) {
- if (resultIterator == null) throw new NullPointerException();
- if (aggregators == null) throw new NullPointerException();
- this.resultIterator = resultIterator;
- this.aggregators = aggregators;
- }
-
- @Override
- public Tuple next() throws SQLException {
- Tuple result = resultIterator.next();
- if (result == null) {
- return null;
- }
- Aggregator[] rowAggregators = aggregators.getAggregators();
- aggregators.reset(rowAggregators);
- while (true) {
- aggregators.aggregate(rowAggregators, result);
- Tuple nextResult = resultIterator.peek();
- if (nextResult == null || !TupleUtil.equals(result, nextResult, tempPtr)) {
- break;
- }
- result = resultIterator.next();
- }
-
- byte[] value = aggregators.toBytes(rowAggregators);
- result.getKey(tempPtr);
- return new SingleKeyValueTuple(KeyValueUtil.newKeyValue(tempPtr, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
- }
-
- @Override
- public void close() throws SQLException {
- resultIterator.close();
+public class GroupedAggregatingResultIterator extends BaseGroupedAggregatingResultIterator {
+
+ public GroupedAggregatingResultIterator(PeekingResultIterator resultIterator, Aggregators aggregators) {
+ super(resultIterator, aggregators);
}
@Override
- public void aggregate(Tuple result) {
- Aggregator[] rowAggregators = aggregators.getAggregators();
- aggregators.reset(rowAggregators);
- aggregators.aggregate(rowAggregators, result);
+ protected ImmutableBytesWritable getGroupingKey(Tuple tuple, ImmutableBytesWritable ptr) throws SQLException {
+ tuple.getKey(ptr);
+ return ptr;
}
@Override
- public void explain(List<String> planSteps) {
- resultIterator.explain(planSteps);
+ protected Tuple wrapKeyValueAsResult(KeyValue keyValue) throws SQLException {
+ return new SingleKeyValueTuple(keyValue);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
index a7f390f..3293f65 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
@@ -25,7 +25,11 @@ import org.apache.phoenix.schema.tuple.Tuple;
abstract public class LookAheadResultIterator implements PeekingResultIterator {
- public static LookAheadResultIterator wrap(final ResultIterator iterator) {
+ public static PeekingResultIterator wrap(final ResultIterator iterator) {
+ if (iterator instanceof PeekingResultIterator) {
+ return (PeekingResultIterator) iterator;
+ }
+
return new LookAheadResultIterator() {
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java
deleted file mode 100644
index 8377b03..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.join;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.phoenix.compile.JoinCompiler.ProjectedPTableWrapper;
-import org.apache.phoenix.expression.Expression;
-import org.apache.phoenix.expression.ExpressionType;
-import org.apache.phoenix.schema.KeyValueSchema;
-import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
-import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.ValueBitSet;
-import org.apache.phoenix.schema.tuple.BaseTuple;
-import org.apache.phoenix.schema.tuple.Tuple;
-import org.apache.phoenix.util.KeyValueUtil;
-import org.apache.phoenix.util.SchemaUtil;
-
-public class TupleProjector {
- public static final byte[] VALUE_COLUMN_FAMILY = Bytes.toBytes("_v");
- public static final byte[] VALUE_COLUMN_QUALIFIER = new byte[0];
-
- private static final String SCAN_PROJECTOR = "scanProjector";
-
- private final KeyValueSchema schema;
- private final Expression[] expressions;
- private ValueBitSet valueSet;
- private final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-
- public TupleProjector(ProjectedPTableWrapper projected) {
- List<PColumn> columns = projected.getTable().getColumns();
- expressions = new Expression[columns.size() - projected.getTable().getPKColumns().size()];
- // we do not count minNullableIndex for we might do later merge.
- KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
- int i = 0;
- for (PColumn column : projected.getTable().getColumns()) {
- if (!SchemaUtil.isPKColumn(column)) {
- builder.addField(column);
- expressions[i++] = projected.getSourceExpression(column);
- }
- }
- schema = builder.build();
- valueSet = ValueBitSet.newInstance(schema);
- }
-
- private TupleProjector(KeyValueSchema schema, Expression[] expressions) {
- this.schema = schema;
- this.expressions = expressions;
- this.valueSet = ValueBitSet.newInstance(schema);
- }
-
- public void setValueBitSet(ValueBitSet bitSet) {
- this.valueSet = bitSet;
- }
-
- public static void serializeProjectorIntoScan(Scan scan, TupleProjector projector) {
- ByteArrayOutputStream stream = new ByteArrayOutputStream();
- try {
- DataOutputStream output = new DataOutputStream(stream);
- projector.schema.write(output);
- int count = projector.expressions.length;
- WritableUtils.writeVInt(output, count);
- for (int i = 0; i < count; i++) {
- WritableUtils.writeVInt(output, ExpressionType.valueOf(projector.expressions[i]).ordinal());
- projector.expressions[i].write(output);
- }
- scan.setAttribute(SCAN_PROJECTOR, stream.toByteArray());
- } catch (IOException e) {
- throw new RuntimeException(e);
- } finally {
- try {
- stream.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- }
-
- public static TupleProjector deserializeProjectorFromScan(Scan scan) {
- byte[] proj = scan.getAttribute(SCAN_PROJECTOR);
- if (proj == null) {
- return null;
- }
- ByteArrayInputStream stream = new ByteArrayInputStream(proj);
- try {
- DataInputStream input = new DataInputStream(stream);
- KeyValueSchema schema = new KeyValueSchema();
- schema.readFields(input);
- int count = WritableUtils.readVInt(input);
- Expression[] expressions = new Expression[count];
- for (int i = 0; i < count; i++) {
- int ordinal = WritableUtils.readVInt(input);
- expressions[i] = ExpressionType.values()[ordinal].newInstance();
- expressions[i].readFields(input);
- }
- return new TupleProjector(schema, expressions);
- } catch (IOException e) {
- throw new RuntimeException(e);
- } finally {
- try {
- stream.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- public static class ProjectedValueTuple extends BaseTuple {
- private ImmutableBytesWritable keyPtr = new ImmutableBytesWritable();
- private long timestamp;
- private byte[] projectedValue;
- private int bitSetLen;
- private KeyValue keyValue;
-
- private ProjectedValueTuple(byte[] keyBuffer, int keyOffset, int keyLength, long timestamp, byte[] projectedValue, int bitSetLen) {
- this.keyPtr.set(keyBuffer, keyOffset, keyLength);
- this.timestamp = timestamp;
- this.projectedValue = projectedValue;
- this.bitSetLen = bitSetLen;
- }
-
- public ImmutableBytesWritable getKeyPtr() {
- return keyPtr;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
- public byte[] getProjectedValue() {
- return projectedValue;
- }
-
- public int getBitSetLength() {
- return bitSetLen;
- }
-
- @Override
- public void getKey(ImmutableBytesWritable ptr) {
- ptr.set(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength());
- }
-
- @Override
- public KeyValue getValue(int index) {
- if (index != 0) {
- throw new IndexOutOfBoundsException(Integer.toString(index));
- }
- return getValue(VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER);
- }
-
- @Override
- public KeyValue getValue(byte[] family, byte[] qualifier) {
- if (keyValue == null) {
- keyValue = KeyValueUtil.newKeyValue(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(),
- VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, timestamp, projectedValue, 0, projectedValue.length);
- }
- return keyValue;
- }
-
- @Override
- public boolean getValue(byte[] family, byte[] qualifier,
- ImmutableBytesWritable ptr) {
- ptr.set(projectedValue);
- return true;
- }
-
- @Override
- public boolean isImmutable() {
- return true;
- }
-
- @Override
- public int size() {
- return 1;
- }
- }
-
- public ProjectedValueTuple projectResults(Tuple tuple) {
- byte[] bytesValue = schema.toBytes(tuple, expressions, valueSet, ptr);
- KeyValue base = tuple.getValue(0);
- return new ProjectedValueTuple(base.getBuffer(), base.getRowOffset(), base.getRowLength(), base.getTimestamp(), bytesValue, valueSet.getEstimatedLength());
- }
-
- public static void decodeProjectedValue(Tuple tuple, ImmutableBytesWritable ptr) throws IOException {
- boolean b = tuple.getValue(VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, ptr);
- if (!b)
- throw new IOException("Trying to decode a non-projected value.");
- }
-
- public static ProjectedValueTuple mergeProjectedValue(ProjectedValueTuple dest, KeyValueSchema destSchema, ValueBitSet destBitSet,
- Tuple src, KeyValueSchema srcSchema, ValueBitSet srcBitSet, int offset) throws IOException {
- ImmutableBytesWritable destValue = new ImmutableBytesWritable(dest.getProjectedValue());
- destBitSet.clear();
- destBitSet.or(destValue);
- int origDestBitSetLen = dest.getBitSetLength();
- ImmutableBytesWritable srcValue = new ImmutableBytesWritable();
- decodeProjectedValue(src, srcValue);
- srcBitSet.clear();
- srcBitSet.or(srcValue);
- int origSrcBitSetLen = srcBitSet.getEstimatedLength();
- for (int i = 0; i < srcBitSet.getMaxSetBit(); i++) {
- if (srcBitSet.get(i)) {
- destBitSet.set(offset + i);
- }
- }
- int destBitSetLen = destBitSet.getEstimatedLength();
- byte[] merged = new byte[destValue.getLength() - origDestBitSetLen + srcValue.getLength() - origSrcBitSetLen + destBitSetLen];
- int o = Bytes.putBytes(merged, 0, destValue.get(), destValue.getOffset(), destValue.getLength() - origDestBitSetLen);
- o = Bytes.putBytes(merged, o, srcValue.get(), srcValue.getOffset(), srcValue.getLength() - origSrcBitSetLen);
- destBitSet.toBytes(merged, o);
- ImmutableBytesWritable keyPtr = dest.getKeyPtr();
- return new ProjectedValueTuple(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(), dest.getTimestamp(), merged, destBitSetLen);
- }
-
- @Override
- public String toString() {
- return "TUPLE-PROJECTOR {" + Arrays.toString(expressions) + " ==> " + schema.toString() + "}";
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 132c831..c7bc944 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -101,6 +101,7 @@ public class QueryOptimizer {
if (!useIndexes
|| select.isJoin()
|| dataPlan.getContext().getResolver().getTables().size() > 1
+ || select.getInnerSelectStatement() != null
|| (dataPlan.getContext().getScanRanges().isPointLookup() && stopAtBestPlan)) {
return Collections.singletonList(dataPlan);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 2242cd0..5aaf04d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -629,8 +629,8 @@ public class ParseNodeFactory {
statement.hasSequence());
}
- public SelectStatement select(SelectStatement statement, List<AliasedNode> select, ParseNode where, List<ParseNode> groupBy, boolean isAggregate) {
- return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), select, where, groupBy,
+ public SelectStatement select(SelectStatement statement, boolean isDistinct, List<AliasedNode> select, ParseNode where, List<ParseNode> groupBy, boolean isAggregate) {
+ return select(statement.getFrom(), statement.getHint(), isDistinct, select, where, groupBy,
statement.getHaving(), statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), isAggregate,
statement.hasSequence());
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
index 6cee588..e7302dc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
@@ -192,4 +192,11 @@ public class SelectStatement implements FilterableStatement {
public boolean isJoin() {
return fromTable.size() > 1 || (fromTable.size() > 0 && fromTable.get(0) instanceof JoinTableNode);
}
+
+ public SelectStatement getInnerSelectStatement() {
+ if (fromTable.size() != 1 || !(fromTable.get(0) instanceof DerivedTableNode))
+ return null;
+
+ return ((DerivedTableNode) fromTable.get(0)).getSelect();
+ }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
index 0910712..f1a0028 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
@@ -114,7 +114,7 @@ public final class ColumnRef {
return new KeyValueColumnExpression(column, displayName);
}
- if (table.getType() == PTableType.JOIN) {
+ if (table.getType() == PTableType.JOIN || table.getType() == PTableType.SUBQUERY) {
return new ProjectedColumnExpression(column, table, column.getName().getString());
}