You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/10/29 01:01:59 UTC

[04/15] PHOENIX-944 Support derived tables in FROM clause that needs extra steps of client-side aggregation or other processing

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
new file mode 100644
index 0000000..14b488d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.compile.ColumnProjector;
+import org.apache.phoenix.compile.JoinCompiler.ProjectedPTableWrapper;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+import org.apache.phoenix.schema.KeyValueSchema;
+import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.ValueBitSet;
+import org.apache.phoenix.schema.tuple.BaseTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+public class TupleProjector {    
+    public static final byte[] VALUE_COLUMN_FAMILY = Bytes.toBytes("_v");
+    public static final byte[] VALUE_COLUMN_QUALIFIER = new byte[0];
+    
+    private static final String SCAN_PROJECTOR = "scanProjector";
+    
+    private final KeyValueSchema schema;
+    private final Expression[] expressions;
+    private ValueBitSet valueSet;
+    private final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+    
+    public TupleProjector(RowProjector rowProjector) {
+        List<? extends ColumnProjector> columnProjectors = rowProjector.getColumnProjectors();
+        int count = columnProjectors.size();
+        KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
+        expressions = new Expression[count];
+        for (int i = 0; i < count; i++) {
+            Expression expression = columnProjectors.get(i).getExpression();
+            builder.addField(expression);
+            expressions[i] = expression;
+        }
+        schema = builder.build();
+        valueSet = ValueBitSet.newInstance(schema);
+    }
+    
+    public TupleProjector(ProjectedPTableWrapper projected) {
+    	List<PColumn> columns = projected.getTable().getColumns();
+    	expressions = new Expression[columns.size() - projected.getTable().getPKColumns().size()];
+    	// we do not count minNullableIndex for we might do later merge.
+    	KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
+    	int i = 0;
+        for (PColumn column : projected.getTable().getColumns()) {
+        	if (!SchemaUtil.isPKColumn(column)) {
+        		builder.addField(column);
+        		expressions[i++] = projected.getSourceExpression(column);
+        	}
+        }
+        schema = builder.build();
+        valueSet = ValueBitSet.newInstance(schema);
+    }
+    
+    private TupleProjector(KeyValueSchema schema, Expression[] expressions) {
+    	this.schema = schema;
+    	this.expressions = expressions;
+    	this.valueSet = ValueBitSet.newInstance(schema);
+    }
+    
+    public void setValueBitSet(ValueBitSet bitSet) {
+        this.valueSet = bitSet;
+    }
+    
+    public static void serializeProjectorIntoScan(Scan scan, TupleProjector projector) {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        try {
+            DataOutputStream output = new DataOutputStream(stream);
+            projector.schema.write(output);
+            int count = projector.expressions.length;
+            WritableUtils.writeVInt(output, count);
+            for (int i = 0; i < count; i++) {
+            	WritableUtils.writeVInt(output, ExpressionType.valueOf(projector.expressions[i]).ordinal());
+            	projector.expressions[i].write(output);
+            }
+            scan.setAttribute(SCAN_PROJECTOR, stream.toByteArray());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        
+    }
+    
+    public static TupleProjector deserializeProjectorFromScan(Scan scan) {
+        byte[] proj = scan.getAttribute(SCAN_PROJECTOR);
+        if (proj == null) {
+            return null;
+        }
+        ByteArrayInputStream stream = new ByteArrayInputStream(proj);
+        try {
+            DataInputStream input = new DataInputStream(stream);
+            KeyValueSchema schema = new KeyValueSchema();
+            schema.readFields(input);
+            int count = WritableUtils.readVInt(input);
+            Expression[] expressions = new Expression[count];
+            for (int i = 0; i < count; i++) {
+            	int ordinal = WritableUtils.readVInt(input);
+            	expressions[i] = ExpressionType.values()[ordinal].newInstance();
+            	expressions[i].readFields(input);
+            }
+            return new TupleProjector(schema, expressions);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+    
+    public static class ProjectedValueTuple extends BaseTuple {
+        private ImmutableBytesWritable keyPtr = new ImmutableBytesWritable();
+        private long timestamp;
+        private byte[] projectedValue;
+        private int bitSetLen;
+        private KeyValue keyValue;
+
+        private ProjectedValueTuple(byte[] keyBuffer, int keyOffset, int keyLength, long timestamp, byte[] projectedValue, int bitSetLen) {
+            this.keyPtr.set(keyBuffer, keyOffset, keyLength);
+            this.timestamp = timestamp;
+            this.projectedValue = projectedValue;
+            this.bitSetLen = bitSetLen;
+        }
+        
+        public ImmutableBytesWritable getKeyPtr() {
+            return keyPtr;
+        }
+        
+        public long getTimestamp() {
+            return timestamp;
+        }
+        
+        public byte[] getProjectedValue() {
+            return projectedValue;
+        }
+        
+        public int getBitSetLength() {
+            return bitSetLen;
+        }
+        
+        @Override
+        public void getKey(ImmutableBytesWritable ptr) {
+            ptr.set(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength());
+        }
+
+        @Override
+        public KeyValue getValue(int index) {
+            if (index != 0) {
+                throw new IndexOutOfBoundsException(Integer.toString(index));
+            }
+            return getValue(VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER);
+        }
+
+        @Override
+        public KeyValue getValue(byte[] family, byte[] qualifier) {
+            if (keyValue == null) {
+                keyValue = KeyValueUtil.newKeyValue(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(), 
+                        VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, timestamp, projectedValue, 0, projectedValue.length);
+            }
+            return keyValue;
+        }
+
+        @Override
+        public boolean getValue(byte[] family, byte[] qualifier,
+                ImmutableBytesWritable ptr) {
+            ptr.set(projectedValue);
+            return true;
+        }
+
+        @Override
+        public boolean isImmutable() {
+            return true;
+        }
+
+        @Override
+        public int size() {
+            return 1;
+        }
+    }
+    
+    public ProjectedValueTuple projectResults(Tuple tuple) {
+    	byte[] bytesValue = schema.toBytes(tuple, getExpressions(), valueSet, ptr);
+    	KeyValue base = tuple.getValue(0);
+        return new ProjectedValueTuple(base.getBuffer(), base.getRowOffset(), base.getRowLength(), base.getTimestamp(), bytesValue, valueSet.getEstimatedLength());
+    }
+    
+    public static void decodeProjectedValue(Tuple tuple, ImmutableBytesWritable ptr) throws IOException {
+    	boolean b = tuple.getValue(VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, ptr);
+        if (!b)
+            throw new IOException("Trying to decode a non-projected value.");
+    }
+    
+    public static ProjectedValueTuple mergeProjectedValue(ProjectedValueTuple dest, KeyValueSchema destSchema, ValueBitSet destBitSet,
+    		Tuple src, KeyValueSchema srcSchema, ValueBitSet srcBitSet, int offset) throws IOException {
+    	ImmutableBytesWritable destValue = new ImmutableBytesWritable(dest.getProjectedValue());
+    	destBitSet.clear();
+    	destBitSet.or(destValue);
+    	int origDestBitSetLen = dest.getBitSetLength();
+    	ImmutableBytesWritable srcValue = new ImmutableBytesWritable();
+    	decodeProjectedValue(src, srcValue);
+    	srcBitSet.clear();
+    	srcBitSet.or(srcValue);
+    	int origSrcBitSetLen = srcBitSet.getEstimatedLength();
+    	for (int i = 0; i < srcBitSet.getMaxSetBit(); i++) {
+    		if (srcBitSet.get(i)) {
+    			destBitSet.set(offset + i);
+    		}
+    	}
+    	int destBitSetLen = destBitSet.getEstimatedLength();
+    	byte[] merged = new byte[destValue.getLength() - origDestBitSetLen + srcValue.getLength() - origSrcBitSetLen + destBitSetLen];
+    	int o = Bytes.putBytes(merged, 0, destValue.get(), destValue.getOffset(), destValue.getLength() - origDestBitSetLen);
+    	o = Bytes.putBytes(merged, o, srcValue.get(), srcValue.getOffset(), srcValue.getLength() - origSrcBitSetLen);
+    	destBitSet.toBytes(merged, o);
+    	ImmutableBytesWritable keyPtr = dest.getKeyPtr();
+        return new ProjectedValueTuple(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(), dest.getTimestamp(), merged, destBitSetLen);
+    }
+
+    public KeyValueSchema getSchema() {
+        return schema;
+    }
+
+    public Expression[] getExpressions() {
+        return expressions;
+    }
+
+    public ValueBitSet getValueBitSet() {
+        return valueSet;
+    }
+    
+    @Override
+    public String toString() {
+        return "TUPLE-PROJECTOR {" + Arrays.toString(expressions) + " ==> " + schema.toString() + "}";
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
index dcac849..10657e0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
@@ -22,8 +22,8 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.visitor.ExpressionVisitor;
-import org.apache.phoenix.join.TupleProjector;
 import org.apache.phoenix.schema.KeyValueSchema;
 import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
 import org.apache.phoenix.schema.PColumn;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java
index fefb077..2af99ca 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueClientAggregator.java
@@ -21,7 +21,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.schema.PArrayDataType;
 import org.apache.phoenix.schema.PDataType;
-import org.apache.phoenix.schema.PhoenixArray;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.Tuple;
 
@@ -37,15 +36,15 @@ public class DistinctValueClientAggregator extends DistinctValueWithCountClientA
 
     @Override
     public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
-        if (buffer == null || buffer.length == 0) {            
+        if (cachedResult == null) {            
             Object[] values = new Object[valueVsCount.size()];
             int i = 0;
             for (ImmutableBytesPtr key : valueVsCount.keySet()) {
                 values[i++] = valueType.toObject(key, sortOrder);
             }
-            PhoenixArray array = PArrayDataType.instantiatePhoenixArray(valueType, values);
-            buffer = resultType.toBytes(array, sortOrder);
+            cachedResult = PArrayDataType.instantiatePhoenixArray(valueType, values);
         }
+        buffer = resultType.toBytes(cachedResult, sortOrder);
         ptr.set(buffer);
         return true;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java
new file mode 100644
index 0000000..8fd36b3
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseGroupedAggregatingResultIterator.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+
+/**
+ * 
+ * Base class for result scanners that aggregate the row count value for rows with 
+ * duplicate keys. This result scanner assumes that the results of the inner result 
+ * scanner are returned in order of grouping keys.
+ * 
+ */
+public abstract class BaseGroupedAggregatingResultIterator implements
+        AggregatingResultIterator {
+    private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+    protected final PeekingResultIterator resultIterator;
+    protected final Aggregators aggregators;
+    private ImmutableBytesWritable currentKey;
+    private ImmutableBytesWritable nextKey;    
+
+    public BaseGroupedAggregatingResultIterator(
+            PeekingResultIterator resultIterator, Aggregators aggregators) {
+        if (resultIterator == null) throw new NullPointerException();
+        if (aggregators == null) throw new NullPointerException();
+        this.resultIterator = resultIterator;
+        this.aggregators = aggregators;
+        this.currentKey = new ImmutableBytesWritable(UNITIALIZED_KEY_BUFFER);
+        this.nextKey = new ImmutableBytesWritable(UNITIALIZED_KEY_BUFFER);        
+    }
+    
+    protected abstract ImmutableBytesWritable getGroupingKey(Tuple tuple, ImmutableBytesWritable ptr) throws SQLException;
+    protected abstract Tuple wrapKeyValueAsResult(KeyValue keyValue) throws SQLException;
+
+    @Override
+    public Tuple next() throws SQLException {
+        Tuple result = resultIterator.next();
+        if (result == null) {
+            return null;
+        }
+        if (currentKey.get() == UNITIALIZED_KEY_BUFFER) {
+            getGroupingKey(result, currentKey);
+        }
+        Aggregator[] rowAggregators = aggregators.getAggregators();
+        aggregators.reset(rowAggregators);
+        while (true) {
+            aggregators.aggregate(rowAggregators, result);
+            Tuple nextResult = resultIterator.peek();
+            if (nextResult == null || !currentKey.equals(getGroupingKey(nextResult, nextKey))) {
+                break;
+            }
+            result = resultIterator.next();
+        }
+        
+        byte[] value = aggregators.toBytes(rowAggregators);
+        Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(currentKey, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+        currentKey.set(nextKey.get(), nextKey.getOffset(), nextKey.getLength());
+        return tuple;
+    }
+    
+    @Override
+    public void close() throws SQLException {
+        resultIterator.close();
+    }
+    
+    @Override
+    public void aggregate(Tuple result) {
+        Aggregator[] rowAggregators = aggregators.getAggregators();
+        aggregators.reset(rowAggregators);
+        aggregators.aggregate(rowAggregators, result);
+    }
+
+    @Override
+    public void explain(List<String> planSteps) {
+        resultIterator.explain(planSteps);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java
index 50e1bc2..db08696 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java
@@ -17,19 +17,13 @@
  */
 package org.apache.phoenix.iterate;
 
-import static org.apache.phoenix.query.QueryConstants.*;
-
 import java.sql.SQLException;
-import java.util.List;
 
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
-import org.apache.phoenix.expression.aggregator.Aggregator;
 import org.apache.phoenix.expression.aggregator.Aggregators;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
-import org.apache.phoenix.util.KeyValueUtil;
-import org.apache.phoenix.util.TupleUtil;
 
 
 
@@ -51,54 +45,20 @@ import org.apache.phoenix.util.TupleUtil;
  * 
  * @since 0.1
  */
-public class GroupedAggregatingResultIterator implements AggregatingResultIterator {
-    private final ImmutableBytesWritable tempPtr = new ImmutableBytesWritable();
-    private final PeekingResultIterator resultIterator;
-    protected final Aggregators aggregators;
-    
-    public GroupedAggregatingResultIterator( PeekingResultIterator resultIterator, Aggregators aggregators) {
-        if (resultIterator == null) throw new NullPointerException();
-        if (aggregators == null) throw new NullPointerException();
-        this.resultIterator = resultIterator;
-        this.aggregators = aggregators;
-    }
-    
-    @Override
-    public Tuple next() throws SQLException {
-        Tuple result = resultIterator.next();
-        if (result == null) {
-            return null;
-        }
-        Aggregator[] rowAggregators = aggregators.getAggregators();
-        aggregators.reset(rowAggregators);
-        while (true) {
-            aggregators.aggregate(rowAggregators, result);
-            Tuple nextResult = resultIterator.peek();
-            if (nextResult == null || !TupleUtil.equals(result, nextResult, tempPtr)) {
-                break;
-            }
-            result = resultIterator.next();
-        }
-        
-        byte[] value = aggregators.toBytes(rowAggregators);
-        result.getKey(tempPtr);
-        return new SingleKeyValueTuple(KeyValueUtil.newKeyValue(tempPtr, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
-    }
-    
-    @Override
-    public void close() throws SQLException {
-        resultIterator.close();
+public class GroupedAggregatingResultIterator extends BaseGroupedAggregatingResultIterator {
+
+    public GroupedAggregatingResultIterator(PeekingResultIterator resultIterator, Aggregators aggregators) {
+        super(resultIterator, aggregators);
     }
     
     @Override
-    public void aggregate(Tuple result) {
-        Aggregator[] rowAggregators = aggregators.getAggregators();
-        aggregators.reset(rowAggregators);
-        aggregators.aggregate(rowAggregators, result);
+    protected ImmutableBytesWritable getGroupingKey(Tuple tuple, ImmutableBytesWritable ptr) throws SQLException {
+        tuple.getKey(ptr);
+        return ptr;
     }
 
     @Override
-    public void explain(List<String> planSteps) {
-        resultIterator.explain(planSteps);
+    protected Tuple wrapKeyValueAsResult(KeyValue keyValue) throws SQLException {
+        return new SingleKeyValueTuple(keyValue);
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
index a7f390f..3293f65 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java
@@ -25,7 +25,11 @@ import org.apache.phoenix.schema.tuple.Tuple;
 
 
 abstract public class LookAheadResultIterator implements PeekingResultIterator {
-    public static LookAheadResultIterator wrap(final ResultIterator iterator) {
+    public static PeekingResultIterator wrap(final ResultIterator iterator) {
+        if (iterator instanceof PeekingResultIterator) {
+            return (PeekingResultIterator) iterator;
+        }
+        
         return new LookAheadResultIterator() {
 
             @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java
deleted file mode 100644
index 8377b03..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.join;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.phoenix.compile.JoinCompiler.ProjectedPTableWrapper;
-import org.apache.phoenix.expression.Expression;
-import org.apache.phoenix.expression.ExpressionType;
-import org.apache.phoenix.schema.KeyValueSchema;
-import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
-import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.ValueBitSet;
-import org.apache.phoenix.schema.tuple.BaseTuple;
-import org.apache.phoenix.schema.tuple.Tuple;
-import org.apache.phoenix.util.KeyValueUtil;
-import org.apache.phoenix.util.SchemaUtil;
-
-public class TupleProjector {    
-    public static final byte[] VALUE_COLUMN_FAMILY = Bytes.toBytes("_v");
-    public static final byte[] VALUE_COLUMN_QUALIFIER = new byte[0];
-    
-    private static final String SCAN_PROJECTOR = "scanProjector";
-    
-    private final KeyValueSchema schema;
-    private final Expression[] expressions;
-    private ValueBitSet valueSet;
-    private final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-    
-    public TupleProjector(ProjectedPTableWrapper projected) {
-    	List<PColumn> columns = projected.getTable().getColumns();
-    	expressions = new Expression[columns.size() - projected.getTable().getPKColumns().size()];
-    	// we do not count minNullableIndex for we might do later merge.
-    	KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
-    	int i = 0;
-        for (PColumn column : projected.getTable().getColumns()) {
-        	if (!SchemaUtil.isPKColumn(column)) {
-        		builder.addField(column);
-        		expressions[i++] = projected.getSourceExpression(column);
-        	}
-        }
-        schema = builder.build();
-        valueSet = ValueBitSet.newInstance(schema);
-    }
-    
-    private TupleProjector(KeyValueSchema schema, Expression[] expressions) {
-    	this.schema = schema;
-    	this.expressions = expressions;
-    	this.valueSet = ValueBitSet.newInstance(schema);
-    }
-    
-    public void setValueBitSet(ValueBitSet bitSet) {
-        this.valueSet = bitSet;
-    }
-    
-    public static void serializeProjectorIntoScan(Scan scan, TupleProjector projector) {
-        ByteArrayOutputStream stream = new ByteArrayOutputStream();
-        try {
-            DataOutputStream output = new DataOutputStream(stream);
-            projector.schema.write(output);
-            int count = projector.expressions.length;
-            WritableUtils.writeVInt(output, count);
-            for (int i = 0; i < count; i++) {
-            	WritableUtils.writeVInt(output, ExpressionType.valueOf(projector.expressions[i]).ordinal());
-            	projector.expressions[i].write(output);
-            }
-            scan.setAttribute(SCAN_PROJECTOR, stream.toByteArray());
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        } finally {
-            try {
-                stream.close();
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-        }
-        
-    }
-    
-    public static TupleProjector deserializeProjectorFromScan(Scan scan) {
-        byte[] proj = scan.getAttribute(SCAN_PROJECTOR);
-        if (proj == null) {
-            return null;
-        }
-        ByteArrayInputStream stream = new ByteArrayInputStream(proj);
-        try {
-            DataInputStream input = new DataInputStream(stream);
-            KeyValueSchema schema = new KeyValueSchema();
-            schema.readFields(input);
-            int count = WritableUtils.readVInt(input);
-            Expression[] expressions = new Expression[count];
-            for (int i = 0; i < count; i++) {
-            	int ordinal = WritableUtils.readVInt(input);
-            	expressions[i] = ExpressionType.values()[ordinal].newInstance();
-            	expressions[i].readFields(input);
-            }
-            return new TupleProjector(schema, expressions);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        } finally {
-            try {
-                stream.close();
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-    
-    public static class ProjectedValueTuple extends BaseTuple {
-        private ImmutableBytesWritable keyPtr = new ImmutableBytesWritable();
-        private long timestamp;
-        private byte[] projectedValue;
-        private int bitSetLen;
-        private KeyValue keyValue;
-
-        private ProjectedValueTuple(byte[] keyBuffer, int keyOffset, int keyLength, long timestamp, byte[] projectedValue, int bitSetLen) {
-            this.keyPtr.set(keyBuffer, keyOffset, keyLength);
-            this.timestamp = timestamp;
-            this.projectedValue = projectedValue;
-            this.bitSetLen = bitSetLen;
-        }
-        
-        public ImmutableBytesWritable getKeyPtr() {
-            return keyPtr;
-        }
-        
-        public long getTimestamp() {
-            return timestamp;
-        }
-        
-        public byte[] getProjectedValue() {
-            return projectedValue;
-        }
-        
-        public int getBitSetLength() {
-            return bitSetLen;
-        }
-        
-        @Override
-        public void getKey(ImmutableBytesWritable ptr) {
-            ptr.set(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength());
-        }
-
-        @Override
-        public KeyValue getValue(int index) {
-            if (index != 0) {
-                throw new IndexOutOfBoundsException(Integer.toString(index));
-            }
-            return getValue(VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER);
-        }
-
-        @Override
-        public KeyValue getValue(byte[] family, byte[] qualifier) {
-            if (keyValue == null) {
-                keyValue = KeyValueUtil.newKeyValue(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(), 
-                        VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, timestamp, projectedValue, 0, projectedValue.length);
-            }
-            return keyValue;
-        }
-
-        @Override
-        public boolean getValue(byte[] family, byte[] qualifier,
-                ImmutableBytesWritable ptr) {
-            ptr.set(projectedValue);
-            return true;
-        }
-
-        @Override
-        public boolean isImmutable() {
-            return true;
-        }
-
-        @Override
-        public int size() {
-            return 1;
-        }
-    }
-    
-    public ProjectedValueTuple projectResults(Tuple tuple) {
-    	byte[] bytesValue = schema.toBytes(tuple, expressions, valueSet, ptr);
-    	KeyValue base = tuple.getValue(0);
-        return new ProjectedValueTuple(base.getBuffer(), base.getRowOffset(), base.getRowLength(), base.getTimestamp(), bytesValue, valueSet.getEstimatedLength());
-    }
-    
-    public static void decodeProjectedValue(Tuple tuple, ImmutableBytesWritable ptr) throws IOException {
-    	boolean b = tuple.getValue(VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, ptr);
-        if (!b)
-            throw new IOException("Trying to decode a non-projected value.");
-    }
-    
-    public static ProjectedValueTuple mergeProjectedValue(ProjectedValueTuple dest, KeyValueSchema destSchema, ValueBitSet destBitSet,
-    		Tuple src, KeyValueSchema srcSchema, ValueBitSet srcBitSet, int offset) throws IOException {
-    	ImmutableBytesWritable destValue = new ImmutableBytesWritable(dest.getProjectedValue());
-    	destBitSet.clear();
-    	destBitSet.or(destValue);
-    	int origDestBitSetLen = dest.getBitSetLength();
-    	ImmutableBytesWritable srcValue = new ImmutableBytesWritable();
-    	decodeProjectedValue(src, srcValue);
-    	srcBitSet.clear();
-    	srcBitSet.or(srcValue);
-    	int origSrcBitSetLen = srcBitSet.getEstimatedLength();
-    	for (int i = 0; i < srcBitSet.getMaxSetBit(); i++) {
-    		if (srcBitSet.get(i)) {
-    			destBitSet.set(offset + i);
-    		}
-    	}
-    	int destBitSetLen = destBitSet.getEstimatedLength();
-    	byte[] merged = new byte[destValue.getLength() - origDestBitSetLen + srcValue.getLength() - origSrcBitSetLen + destBitSetLen];
-    	int o = Bytes.putBytes(merged, 0, destValue.get(), destValue.getOffset(), destValue.getLength() - origDestBitSetLen);
-    	o = Bytes.putBytes(merged, o, srcValue.get(), srcValue.getOffset(), srcValue.getLength() - origSrcBitSetLen);
-    	destBitSet.toBytes(merged, o);
-    	ImmutableBytesWritable keyPtr = dest.getKeyPtr();
-        return new ProjectedValueTuple(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(), dest.getTimestamp(), merged, destBitSetLen);
-    }
-    
-    @Override
-    public String toString() {
-        return "TUPLE-PROJECTOR {" + Arrays.toString(expressions) + " ==> " + schema.toString() + "}";
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index 132c831..c7bc944 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -101,6 +101,7 @@ public class QueryOptimizer {
         if (!useIndexes 
                 || select.isJoin() 
                 || dataPlan.getContext().getResolver().getTables().size() > 1
+                || select.getInnerSelectStatement() != null
                 || (dataPlan.getContext().getScanRanges().isPointLookup() && stopAtBestPlan)) {
             return Collections.singletonList(dataPlan);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 2242cd0..5aaf04d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -629,8 +629,8 @@ public class ParseNodeFactory {
                 statement.hasSequence());
     }
 
-    public SelectStatement select(SelectStatement statement, List<AliasedNode> select, ParseNode where, List<ParseNode> groupBy, boolean isAggregate) {
-        return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), select, where, groupBy,
+    public SelectStatement select(SelectStatement statement, boolean isDistinct, List<AliasedNode> select, ParseNode where, List<ParseNode> groupBy, boolean isAggregate) {
+        return select(statement.getFrom(), statement.getHint(), isDistinct, select, where, groupBy,
                 statement.getHaving(), statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), isAggregate,
                 statement.hasSequence());
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
index 6cee588..e7302dc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatement.java
@@ -192,4 +192,11 @@ public class SelectStatement implements FilterableStatement {
     public boolean isJoin() {
         return fromTable.size() > 1 || (fromTable.size() > 0 && fromTable.get(0) instanceof JoinTableNode);
     }
+    
+    public SelectStatement getInnerSelectStatement() {
+        if (fromTable.size() != 1 || !(fromTable.get(0) instanceof DerivedTableNode))
+            return null;
+        
+        return ((DerivedTableNode) fromTable.get(0)).getSelect();
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bdc33bc/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
index 0910712..f1a0028 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java
@@ -114,7 +114,7 @@ public final class ColumnRef {
         	return new KeyValueColumnExpression(column, displayName);
         }
         
-        if (table.getType() == PTableType.JOIN) {
+        if (table.getType() == PTableType.JOIN || table.getType() == PTableType.SUBQUERY) {
         	return new ProjectedColumnExpression(column, table, column.getName().getString());
         }