You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2014/04/13 23:05:07 UTC

[1/2] PHOENIX-927 Support derived tables in joins

Repository: incubator-phoenix
Updated Branches:
  refs/heads/master d6a096f07 -> 16e74c68b


http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/16e74c68/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
index 9f62982..dcac849 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.expression.visitor.ExpressionVisitor;
-import org.apache.phoenix.join.ScanProjector;
+import org.apache.phoenix.join.TupleProjector;
 import org.apache.phoenix.schema.KeyValueSchema;
 import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
 import org.apache.phoenix.schema.PColumn;
@@ -95,7 +95,7 @@ public class ProjectedColumnExpression extends ColumnExpression {
 	@Override
 	public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
         try {
-            ScanProjector.decodeProjectedValue(tuple, ptr);
+            TupleProjector.decodeProjectedValue(tuple, ptr);
             int maxOffset = ptr.getOffset() + ptr.getLength();
             bitSet.clear();
             bitSet.or(ptr);

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/16e74c68/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
index 43783b8..909e772 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheClient.java
@@ -69,16 +69,16 @@ public class HashCacheClient  {
      * @throws MaxServerCacheSizeExceededException if size of hash cache exceeds max allowed
      * size
      */
-    public ServerCache addHashCache(ScanRanges keyRanges, ResultIterator iterator, long estimatedSize, List<Expression> onExpressions, TableRef cacheUsingTableRef) throws SQLException {
+    public ServerCache addHashCache(ScanRanges keyRanges, ResultIterator iterator, TupleProjector projector, long estimatedSize, List<Expression> onExpressions, TableRef cacheUsingTableRef) throws SQLException {
         /**
          * Serialize and compress hashCacheTable
          */
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-        serialize(ptr, iterator, estimatedSize, onExpressions);
+        serialize(ptr, iterator, projector, estimatedSize, onExpressions);
         return serverCache.addServerCache(keyRanges, ptr, new HashCacheFactory(), cacheUsingTableRef);
     }
     
-    private void serialize(ImmutableBytesWritable ptr, ResultIterator iterator, long estimatedSize, List<Expression> onExpressions) throws SQLException {
+    private void serialize(ImmutableBytesWritable ptr, ResultIterator iterator, TupleProjector projector, long estimatedSize, List<Expression> onExpressions) throws SQLException {
         long maxSize = serverCache.getConnection().getQueryServices().getProps().getLong(QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_SIZE);
         estimatedSize = Math.min(estimatedSize, maxSize);
         if (estimatedSize > Integer.MAX_VALUE) {
@@ -98,6 +98,9 @@ public class HashCacheClient  {
             int nRows = 0;
             out.writeInt(nRows); // In the end will be replaced with total number of rows            
             for (Tuple result = iterator.next(); result != null; result = iterator.next()) {
+                if (projector != null) {
+                    result = projector.projectResults(result);
+                }
                 TupleUtil.write(result, out);
                 if (baOut.size() > maxSize) {
                     throw new MaxServerCacheSizeExceededException("Size of hash cache (" + baOut.size() + " bytes) exceeds the maximum allowed size (" + maxSize + " bytes)");

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/16e74c68/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
index c592483..257fa1f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java
@@ -23,7 +23,6 @@ import java.util.*;
 
 import net.jcip.annotations.Immutable;
 
-import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/16e74c68/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
index 3386cda..62f8c71 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashJoinInfo.java
@@ -45,9 +45,10 @@ public class HashJoinInfo {
     private KeyValueSchema[] schemas;
     private int[] fieldPositions;
     private Expression postJoinFilterExpression;
+    private boolean forceProjection;
     
-    public HashJoinInfo(PTable joinedTable, ImmutableBytesPtr[] joinIds, List<Expression>[] joinExpressions, JoinType[] joinTypes, boolean[] earlyEvaluation, PTable[] tables, int[] fieldPositions, Expression postJoinFilterExpression) {
-    	this(buildSchema(joinedTable), joinIds, joinExpressions, joinTypes, earlyEvaluation, buildSchemas(tables), fieldPositions, postJoinFilterExpression);
+    public HashJoinInfo(PTable joinedTable, ImmutableBytesPtr[] joinIds, List<Expression>[] joinExpressions, JoinType[] joinTypes, boolean[] earlyEvaluation, PTable[] tables, int[] fieldPositions, Expression postJoinFilterExpression, boolean forceProjection) {
+    	this(buildSchema(joinedTable), joinIds, joinExpressions, joinTypes, earlyEvaluation, buildSchemas(tables), fieldPositions, postJoinFilterExpression, forceProjection);
     }
     
     private static KeyValueSchema[] buildSchemas(PTable[] tables) {
@@ -70,7 +71,7 @@ public class HashJoinInfo {
         return builder.build();
     }
     
-    private HashJoinInfo(KeyValueSchema joinedSchema, ImmutableBytesPtr[] joinIds, List<Expression>[] joinExpressions, JoinType[] joinTypes, boolean[] earlyEvaluation, KeyValueSchema[] schemas, int[] fieldPositions, Expression postJoinFilterExpression) {
+    private HashJoinInfo(KeyValueSchema joinedSchema, ImmutableBytesPtr[] joinIds, List<Expression>[] joinExpressions, JoinType[] joinTypes, boolean[] earlyEvaluation, KeyValueSchema[] schemas, int[] fieldPositions, Expression postJoinFilterExpression, boolean forceProjection) {
     	this.joinedSchema = joinedSchema;
     	this.joinIds = joinIds;
         this.joinExpressions = joinExpressions;
@@ -79,6 +80,7 @@ public class HashJoinInfo {
         this.schemas = schemas;
         this.fieldPositions = fieldPositions;
         this.postJoinFilterExpression = postJoinFilterExpression;
+        this.forceProjection = forceProjection;
     }
     
     public KeyValueSchema getJoinedSchema() {
@@ -113,6 +115,14 @@ public class HashJoinInfo {
         return postJoinFilterExpression;
     }
     
+    /*
+     * If the LHS table is a sub-select, we always do projection, since
+     * the ON expressions reference only projected columns.
+     */
+    public boolean forceProjection() {
+        return forceProjection;
+    }
+    
     public static void serializeHashJoinIntoScan(Scan scan, HashJoinInfo joinInfo) {
         ByteArrayOutputStream stream = new ByteArrayOutputStream();
         try {
@@ -138,6 +148,7 @@ public class HashJoinInfo {
             } else {
                 WritableUtils.writeVInt(output, -1);
             }
+            output.writeBoolean(joinInfo.forceProjection);
             scan.setAttribute(HASH_JOIN, stream.toByteArray());
         } catch (IOException e) {
             throw new RuntimeException(e);
@@ -193,7 +204,8 @@ public class HashJoinInfo {
                 postJoinFilterExpression = ExpressionType.values()[expressionOrdinal].newInstance();
                 postJoinFilterExpression.readFields(input);
             }
-            return new HashJoinInfo(joinedSchema, joinIds, joinExpressions, joinTypes, earlyEvaluation, schemas, fieldPositions, postJoinFilterExpression);
+            boolean forceProjection = input.readBoolean();
+            return new HashJoinInfo(joinedSchema, joinIds, joinExpressions, joinTypes, earlyEvaluation, schemas, fieldPositions, postJoinFilterExpression, forceProjection);
         } catch (IOException e) {
             throw new RuntimeException(e);
         } finally {

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/16e74c68/phoenix-core/src/main/java/org/apache/phoenix/join/ScanProjector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/ScanProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/join/ScanProjector.java
deleted file mode 100644
index f43798e..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/join/ScanProjector.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.join;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.phoenix.compile.JoinCompiler.ProjectedPTableWrapper;
-import org.apache.phoenix.expression.Expression;
-import org.apache.phoenix.expression.ExpressionType;
-import org.apache.phoenix.schema.KeyValueSchema;
-import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
-import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.ValueBitSet;
-import org.apache.phoenix.schema.tuple.BaseTuple;
-import org.apache.phoenix.schema.tuple.Tuple;
-import org.apache.phoenix.util.KeyValueUtil;
-import org.apache.phoenix.util.SchemaUtil;
-
-public class ScanProjector {    
-    public static final byte[] VALUE_COLUMN_FAMILY = Bytes.toBytes("_v");
-    public static final byte[] VALUE_COLUMN_QUALIFIER = new byte[0];
-    
-    private static final String SCAN_PROJECTOR = "scanProjector";
-    
-    private final KeyValueSchema schema;
-    private final Expression[] expressions;
-    private ValueBitSet valueSet;
-    private final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-    
-    public ScanProjector(ProjectedPTableWrapper projected) {
-    	List<PColumn> columns = projected.getTable().getColumns();
-    	expressions = new Expression[columns.size() - projected.getTable().getPKColumns().size()];
-    	// we do not count minNullableIndex for we might do later merge.
-    	KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
-    	int i = 0;
-        for (PColumn column : projected.getTable().getColumns()) {
-        	if (!SchemaUtil.isPKColumn(column)) {
-        		builder.addField(column);
-        		expressions[i++] = projected.getSourceExpression(column);
-        	}
-        }
-        schema = builder.build();
-        valueSet = ValueBitSet.newInstance(schema);
-    }
-    
-    private ScanProjector(KeyValueSchema schema, Expression[] expressions) {
-    	this.schema = schema;
-    	this.expressions = expressions;
-    	this.valueSet = ValueBitSet.newInstance(schema);
-    }
-    
-    public void setValueBitSet(ValueBitSet bitSet) {
-        this.valueSet = bitSet;
-    }
-    
-    public static void serializeProjectorIntoScan(Scan scan, ScanProjector projector) {
-        ByteArrayOutputStream stream = new ByteArrayOutputStream();
-        try {
-            DataOutputStream output = new DataOutputStream(stream);
-            projector.schema.write(output);
-            int count = projector.expressions.length;
-            WritableUtils.writeVInt(output, count);
-            for (int i = 0; i < count; i++) {
-            	WritableUtils.writeVInt(output, ExpressionType.valueOf(projector.expressions[i]).ordinal());
-            	projector.expressions[i].write(output);
-            }
-            scan.setAttribute(SCAN_PROJECTOR, stream.toByteArray());
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        } finally {
-            try {
-                stream.close();
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-        }
-        
-    }
-    
-    public static ScanProjector deserializeProjectorFromScan(Scan scan) {
-        byte[] proj = scan.getAttribute(SCAN_PROJECTOR);
-        if (proj == null) {
-            return null;
-        }
-        ByteArrayInputStream stream = new ByteArrayInputStream(proj);
-        try {
-            DataInputStream input = new DataInputStream(stream);
-            KeyValueSchema schema = new KeyValueSchema();
-            schema.readFields(input);
-            int count = WritableUtils.readVInt(input);
-            Expression[] expressions = new Expression[count];
-            for (int i = 0; i < count; i++) {
-            	int ordinal = WritableUtils.readVInt(input);
-            	expressions[i] = ExpressionType.values()[ordinal].newInstance();
-            	expressions[i].readFields(input);
-            }
-            return new ScanProjector(schema, expressions);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        } finally {
-            try {
-                stream.close();
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-    
-    public static class ProjectedValueTuple extends BaseTuple {
-        private ImmutableBytesWritable keyPtr = new ImmutableBytesWritable();
-        private long timestamp;
-        private byte[] projectedValue;
-        private int bitSetLen;
-        private KeyValue keyValue;
-
-        private ProjectedValueTuple(byte[] keyBuffer, int keyOffset, int keyLength, long timestamp, byte[] projectedValue, int bitSetLen) {
-            this.keyPtr.set(keyBuffer, keyOffset, keyLength);
-            this.timestamp = timestamp;
-            this.projectedValue = projectedValue;
-            this.bitSetLen = bitSetLen;
-        }
-        
-        public ImmutableBytesWritable getKeyPtr() {
-            return keyPtr;
-        }
-        
-        public long getTimestamp() {
-            return timestamp;
-        }
-        
-        public byte[] getProjectedValue() {
-            return projectedValue;
-        }
-        
-        public int getBitSetLength() {
-            return bitSetLen;
-        }
-        
-        @Override
-        public void getKey(ImmutableBytesWritable ptr) {
-            ptr.set(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength());
-        }
-
-        @Override
-        public KeyValue getValue(int index) {
-            if (index != 0) {
-                throw new IndexOutOfBoundsException(Integer.toString(index));
-            }
-            return getValue(VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER);
-        }
-
-        @Override
-        public KeyValue getValue(byte[] family, byte[] qualifier) {
-            if (keyValue == null) {
-                keyValue = KeyValueUtil.newKeyValue(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(), 
-                        VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, timestamp, projectedValue, 0, projectedValue.length);
-            }
-            return keyValue;
-        }
-
-        @Override
-        public boolean getValue(byte[] family, byte[] qualifier,
-                ImmutableBytesWritable ptr) {
-            ptr.set(projectedValue);
-            return true;
-        }
-
-        @Override
-        public boolean isImmutable() {
-            return true;
-        }
-
-        @Override
-        public int size() {
-            return 1;
-        }
-    }
-    
-    public ProjectedValueTuple projectResults(Tuple tuple) {
-    	byte[] bytesValue = schema.toBytes(tuple, expressions, valueSet, ptr);
-    	Cell base = tuple.getValue(0);
-        return new ProjectedValueTuple(base.getRowArray(), base.getRowOffset(), base.getRowLength(), base.getTimestamp(), bytesValue, valueSet.getEstimatedLength());
-    }
-    
-    public static void decodeProjectedValue(Tuple tuple, ImmutableBytesWritable ptr) throws IOException {
-    	boolean b = tuple.getValue(VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, ptr);
-        if (!b)
-            throw new IOException("Trying to decode a non-projected value.");
-    }
-    
-    public static ProjectedValueTuple mergeProjectedValue(ProjectedValueTuple dest, KeyValueSchema destSchema, ValueBitSet destBitSet,
-    		Tuple src, KeyValueSchema srcSchema, ValueBitSet srcBitSet, int offset) throws IOException {
-    	ImmutableBytesWritable destValue = new ImmutableBytesWritable(dest.getProjectedValue());
-    	destBitSet.clear();
-    	destBitSet.or(destValue);
-    	int origDestBitSetLen = dest.getBitSetLength();
-    	ImmutableBytesWritable srcValue = new ImmutableBytesWritable();
-    	decodeProjectedValue(src, srcValue);
-    	srcBitSet.clear();
-    	srcBitSet.or(srcValue);
-    	int origSrcBitSetLen = srcBitSet.getEstimatedLength();
-    	for (int i = 0; i < srcBitSet.getMaxSetBit(); i++) {
-    		if (srcBitSet.get(i)) {
-    			destBitSet.set(offset + i);
-    		}
-    	}
-    	int destBitSetLen = destBitSet.getEstimatedLength();
-    	byte[] merged = new byte[destValue.getLength() - origDestBitSetLen + srcValue.getLength() - origSrcBitSetLen + destBitSetLen];
-    	int o = Bytes.putBytes(merged, 0, destValue.get(), destValue.getOffset(), destValue.getLength() - origDestBitSetLen);
-    	o = Bytes.putBytes(merged, o, srcValue.get(), srcValue.getOffset(), srcValue.getLength() - origSrcBitSetLen);
-    	destBitSet.toBytes(merged, o);
-    	ImmutableBytesWritable keyPtr = dest.getKeyPtr();
-        return new ProjectedValueTuple(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(), dest.getTimestamp(), merged, destBitSetLen);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/16e74c68/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java
new file mode 100644
index 0000000..c41125a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/join/TupleProjector.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.join;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.compile.JoinCompiler.ProjectedPTableWrapper;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
+import org.apache.phoenix.schema.KeyValueSchema;
+import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.ValueBitSet;
+import org.apache.phoenix.schema.tuple.BaseTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+public class TupleProjector {    
+    public static final byte[] VALUE_COLUMN_FAMILY = Bytes.toBytes("_v");
+    public static final byte[] VALUE_COLUMN_QUALIFIER = new byte[0];
+    
+    private static final String SCAN_PROJECTOR = "scanProjector";
+    
+    private final KeyValueSchema schema;
+    private final Expression[] expressions;
+    private ValueBitSet valueSet;
+    private final ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+    
+    public TupleProjector(ProjectedPTableWrapper projected) {
+    	List<PColumn> columns = projected.getTable().getColumns();
+    	expressions = new Expression[columns.size() - projected.getTable().getPKColumns().size()];
+    	// we do not count minNullableIndex for we might do later merge.
+    	KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
+    	int i = 0;
+        for (PColumn column : projected.getTable().getColumns()) {
+        	if (!SchemaUtil.isPKColumn(column)) {
+        		builder.addField(column);
+        		expressions[i++] = projected.getSourceExpression(column);
+        	}
+        }
+        schema = builder.build();
+        valueSet = ValueBitSet.newInstance(schema);
+    }
+    
+    private TupleProjector(KeyValueSchema schema, Expression[] expressions) {
+    	this.schema = schema;
+    	this.expressions = expressions;
+    	this.valueSet = ValueBitSet.newInstance(schema);
+    }
+    
+    public void setValueBitSet(ValueBitSet bitSet) {
+        this.valueSet = bitSet;
+    }
+    
+    public static void serializeProjectorIntoScan(Scan scan, TupleProjector projector) {
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+        try {
+            DataOutputStream output = new DataOutputStream(stream);
+            projector.schema.write(output);
+            int count = projector.expressions.length;
+            WritableUtils.writeVInt(output, count);
+            for (int i = 0; i < count; i++) {
+            	WritableUtils.writeVInt(output, ExpressionType.valueOf(projector.expressions[i]).ordinal());
+            	projector.expressions[i].write(output);
+            }
+            scan.setAttribute(SCAN_PROJECTOR, stream.toByteArray());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        
+    }
+    
+    public static TupleProjector deserializeProjectorFromScan(Scan scan) {
+        byte[] proj = scan.getAttribute(SCAN_PROJECTOR);
+        if (proj == null) {
+            return null;
+        }
+        ByteArrayInputStream stream = new ByteArrayInputStream(proj);
+        try {
+            DataInputStream input = new DataInputStream(stream);
+            KeyValueSchema schema = new KeyValueSchema();
+            schema.readFields(input);
+            int count = WritableUtils.readVInt(input);
+            Expression[] expressions = new Expression[count];
+            for (int i = 0; i < count; i++) {
+            	int ordinal = WritableUtils.readVInt(input);
+            	expressions[i] = ExpressionType.values()[ordinal].newInstance();
+            	expressions[i].readFields(input);
+            }
+            return new TupleProjector(schema, expressions);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } finally {
+            try {
+                stream.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+    
+    public static class ProjectedValueTuple extends BaseTuple {
+        private ImmutableBytesWritable keyPtr = new ImmutableBytesWritable();
+        private long timestamp;
+        private byte[] projectedValue;
+        private int bitSetLen;
+        private KeyValue keyValue;
+
+        private ProjectedValueTuple(byte[] keyBuffer, int keyOffset, int keyLength, long timestamp, byte[] projectedValue, int bitSetLen) {
+            this.keyPtr.set(keyBuffer, keyOffset, keyLength);
+            this.timestamp = timestamp;
+            this.projectedValue = projectedValue;
+            this.bitSetLen = bitSetLen;
+        }
+        
+        public ImmutableBytesWritable getKeyPtr() {
+            return keyPtr;
+        }
+        
+        public long getTimestamp() {
+            return timestamp;
+        }
+        
+        public byte[] getProjectedValue() {
+            return projectedValue;
+        }
+        
+        public int getBitSetLength() {
+            return bitSetLen;
+        }
+        
+        @Override
+        public void getKey(ImmutableBytesWritable ptr) {
+            ptr.set(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength());
+        }
+
+        @Override
+        public KeyValue getValue(int index) {
+            if (index != 0) {
+                throw new IndexOutOfBoundsException(Integer.toString(index));
+            }
+            return getValue(VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER);
+        }
+
+        @Override
+        public KeyValue getValue(byte[] family, byte[] qualifier) {
+            if (keyValue == null) {
+                keyValue = KeyValueUtil.newKeyValue(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(), 
+                        VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, timestamp, projectedValue, 0, projectedValue.length);
+            }
+            return keyValue;
+        }
+
+        @Override
+        public boolean getValue(byte[] family, byte[] qualifier,
+                ImmutableBytesWritable ptr) {
+            ptr.set(projectedValue);
+            return true;
+        }
+
+        @Override
+        public boolean isImmutable() {
+            return true;
+        }
+
+        @Override
+        public int size() {
+            return 1;
+        }
+    }
+    
+    public ProjectedValueTuple projectResults(Tuple tuple) {
+    	byte[] bytesValue = schema.toBytes(tuple, expressions, valueSet, ptr);
+    	Cell base = tuple.getValue(0);
+        return new ProjectedValueTuple(base.getRowArray(), base.getRowOffset(), base.getRowLength(), base.getTimestamp(), bytesValue, valueSet.getEstimatedLength());
+    }
+    
+    public static void decodeProjectedValue(Tuple tuple, ImmutableBytesWritable ptr) throws IOException {
+    	boolean b = tuple.getValue(VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, ptr);
+        if (!b)
+            throw new IOException("Trying to decode a non-projected value.");
+    }
+    
+    public static ProjectedValueTuple mergeProjectedValue(ProjectedValueTuple dest, KeyValueSchema destSchema, ValueBitSet destBitSet,
+    		Tuple src, KeyValueSchema srcSchema, ValueBitSet srcBitSet, int offset) throws IOException {
+    	ImmutableBytesWritable destValue = new ImmutableBytesWritable(dest.getProjectedValue());
+    	destBitSet.clear();
+    	destBitSet.or(destValue);
+    	int origDestBitSetLen = dest.getBitSetLength();
+    	ImmutableBytesWritable srcValue = new ImmutableBytesWritable();
+    	decodeProjectedValue(src, srcValue);
+    	srcBitSet.clear();
+    	srcBitSet.or(srcValue);
+    	int origSrcBitSetLen = srcBitSet.getEstimatedLength();
+    	for (int i = 0; i < srcBitSet.getMaxSetBit(); i++) {
+    		if (srcBitSet.get(i)) {
+    			destBitSet.set(offset + i);
+    		}
+    	}
+    	int destBitSetLen = destBitSet.getEstimatedLength();
+    	byte[] merged = new byte[destValue.getLength() - origDestBitSetLen + srcValue.getLength() - origSrcBitSetLen + destBitSetLen];
+    	int o = Bytes.putBytes(merged, 0, destValue.get(), destValue.getOffset(), destValue.getLength() - origDestBitSetLen);
+    	o = Bytes.putBytes(merged, o, srcValue.get(), srcValue.getOffset(), srcValue.getLength() - origSrcBitSetLen);
+    	destBitSet.toBytes(merged, o);
+    	ImmutableBytesWritable keyPtr = dest.getKeyPtr();
+        return new ProjectedValueTuple(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(), dest.getTimestamp(), merged, destBitSetLen);
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/16e74c68/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index b275e3d..76276e4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -80,7 +80,7 @@ public class QueryOptimizer {
         // TODO: the recompile for the index tables could skip the normalize step
         SelectStatement select = (SelectStatement)dataPlan.getStatement();
         // TODO: consider not even compiling index plans if we have a point lookup
-        if (!useIndexes || select.isJoin()) {
+        if (!useIndexes || select.isJoin() || dataPlan.getContext().getResolver().getTables().size() > 1) {
             return dataPlan;
         }
         PTable dataTable = dataPlan.getTableRef().getTable();

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/16e74c68/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
index bff5834..19d0570 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeRewriter.java
@@ -204,6 +204,10 @@ public class ParseNodeRewriter extends TraverseAllParseNodeVisitor<ParseNode> {
         this.nodeCount = 0;
     }
     
+    protected boolean visitDerivedTableNode() {
+        return true;
+    }
+    
     private static interface CompoundNodeFactory {
         ParseNode createNode(List<ParseNode> children);
     }
@@ -528,7 +532,15 @@ public class ParseNodeRewriter extends TraverseAllParseNodeVisitor<ParseNode> {
 
         @Override
         public TableNode visit(DerivedTableNode subselectNode) throws SQLException {
-            return subselectNode;
+            if (!parseNodeRewriter.visitDerivedTableNode())
+                return subselectNode;
+            
+            SelectStatement select = subselectNode.getSelect();
+            SelectStatement normSelect = rewrite(select, parseNodeRewriter);
+            if (select == normSelect)
+                return subselectNode;
+            
+            return NODE_FACTORY.derivedTable(subselectNode.getAlias(), normSelect);
         }
 	    
 	}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/16e74c68/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableType.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableType.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableType.java
index 2de7793..23ba829 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableType.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableType.java
@@ -27,7 +27,8 @@ public enum PTableType {
     TABLE("u", "TABLE"),
     VIEW("v", "VIEW"),
     INDEX("i", "INDEX"),
-    JOIN("j", "JOIN"); 
+    JOIN("j", "JOIN"),
+    SUBQUERY("q", "SUBQUERY"); 
 
     private final PName value;
     private final String serializedValue;


[2/2] git commit: PHOENIX-927 Support derived tables in joins

Posted by ma...@apache.org.
PHOENIX-927 Support derived tables in joins


Project: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/commit/16e74c68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/tree/16e74c68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-phoenix/diff/16e74c68

Branch: refs/heads/master
Commit: 16e74c68b15232242f54a2fa2b25649264f513f3
Parents: d6a096f
Author: maryannxue <ma...@apache.org>
Authored: Sun Apr 13 17:04:36 2014 -0400
Committer: maryannxue <ma...@apache.org>
Committed: Sun Apr 13 17:04:36 2014 -0400

----------------------------------------------------------------------
 .../org/apache/phoenix/end2end/HashJoinIT.java  | 489 +++++++++++++++++++
 .../apache/phoenix/compile/FromCompiler.java    |  39 +-
 .../phoenix/compile/IndexStatementRewriter.java |   5 +
 .../apache/phoenix/compile/JoinCompiler.java    | 209 ++++++--
 .../apache/phoenix/compile/QueryCompiler.java   |  92 +++-
 .../phoenix/compile/StatementContext.java       |  10 +
 .../phoenix/compile/StatementNormalizer.java    |   4 +-
 .../GroupedAggregateRegionObserver.java         |   4 +-
 .../coprocessor/HashJoinRegionScanner.java      |  20 +-
 .../phoenix/coprocessor/ScanRegionObserver.java |   4 +-
 .../UngroupedAggregateRegionObserver.java       |   4 +-
 .../apache/phoenix/execute/HashJoinPlan.java    |  23 +-
 .../expression/ProjectedColumnExpression.java   |   4 +-
 .../apache/phoenix/join/HashCacheClient.java    |   9 +-
 .../apache/phoenix/join/HashCacheFactory.java   |   1 -
 .../org/apache/phoenix/join/HashJoinInfo.java   |  20 +-
 .../org/apache/phoenix/join/ScanProjector.java  | 241 ---------
 .../org/apache/phoenix/join/TupleProjector.java | 242 +++++++++
 .../apache/phoenix/optimize/QueryOptimizer.java |   2 +-
 .../apache/phoenix/parse/ParseNodeRewriter.java |  14 +-
 .../org/apache/phoenix/schema/PTableType.java   |   3 +-
 21 files changed, 1106 insertions(+), 333 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/16e74c68/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
index 5f02381..41b673b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinIT.java
@@ -307,6 +307,103 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                 "                    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
                 "                    BUILD HASH TABLE 0\n" +
                 "                        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME,
+                /* 
+                 * testJoinWithSubqueryAndAggregation()
+                 *     SELECT i.name, sum(quantity) FROM joinOrderTable o 
+                 *     LEFT JOIN (SELECT name, item_id iid FROM joinItemTable) AS i 
+                 *     ON o.item_id = i.iid 
+                 *     GROUP BY i.name ORDER BY i.name
+                 */     
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "    SERVER AGGREGATE INTO DISTINCT ROWS BY [I.NAME]\n" +
+                "CLIENT MERGE SORT\n" +
+                "CLIENT SORTED BY [I.NAME]\n" +
+                "    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
+                "    BUILD HASH TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_DISPLAY_NAME,
+                /* 
+                 * testJoinWithSubqueryAndAggregation()
+                 *     SELECT o.iid, sum(o.quantity) q 
+                 *     FROM (SELECT item_id iid, quantity FROM joinOrderTable) AS o 
+                 *     LEFT JOIN (SELECT item_id FROM joinItemTable) AS i 
+                 *     ON o.iid = i.item_id 
+                 *     GROUP BY o.iid ORDER BY q DESC                 
+                 */     
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "    SERVER AGGREGATE INTO DISTINCT ROWS BY [O.IID]\n" +
+                "CLIENT MERGE SORT\n" +
+                "CLIENT SORTED BY [SUM(O.QUANTITY) DESC]\n" +
+                "    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
+                "    BUILD HASH TABLE 0 (SKIP MERGE)\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_DISPLAY_NAME + "\n" +
+                "            SERVER FILTER BY FIRST KEY ONLY",
+                /* 
+                 * testJoinWithSubqueryAndAggregation()
+                 *     SELECT i.iid, o.q 
+                 *     FROM (SELECT item_id iid FROM joinItemTable) AS i 
+                 *     LEFT JOIN (SELECT item_id iid, sum(quantity) q FROM joinOrderTable GROUP BY item_id) AS o 
+                 *     ON o.iid = i.iid 
+                 *     ORDER BY o.q DESC NULLS LAST, i.iid
+                 */     
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_DISPLAY_NAME + "\n" +
+                "    SERVER FILTER BY FIRST KEY ONLY\n" +
+                "    SERVER SORTED BY [O.Q DESC NULLS LAST, I.IID]\n" +
+                "CLIENT MERGE SORT\n" +
+                "    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
+                "    BUILD HASH TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "            SERVER AGGREGATE INTO DISTINCT ROWS BY [item_id]\n" +
+                "        CLIENT MERGE SORT",
+                /* 
+                 * testJoinWithSubqueryAndAggregation()
+                 *     SELECT i.iid, o.q 
+                 *     FROM (SELECT item_id iid, sum(quantity) q FROM joinOrderTable GROUP BY item_id) AS o 
+                 *     JOIN (SELECT item_id iid FROM joinItemTable) AS i 
+                 *     ON o.iid = i.iid 
+                 *     ORDER BY o.q DESC, i.iid
+                 */     
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_DISPLAY_NAME + "\n" +
+                "    SERVER FILTER BY FIRST KEY ONLY\n" +
+                "    SERVER SORTED BY [O.Q DESC, I.IID]\n" +
+                "CLIENT MERGE SORT\n" +
+                "    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
+                "    BUILD HASH TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "            SERVER AGGREGATE INTO DISTINCT ROWS BY [item_id]\n" +
+                "        CLIENT MERGE SORT",
+                /*
+                 * testNestedSubqueries()
+                 *     SELECT * FROM (SELECT customer_id cid, name, phone, address, loc_id, date FROM joinCustomerTable) AS c 
+                 *     INNER JOIN (SELECT o.oid ooid, o.cid ocid, o.iid oiid, o.price oprice, o.quantity oquantity, o.date odate, 
+                 *     qi.iiid iiid, qi.iname iname, qi.iprice iprice, qi.idiscount1 idiscount1, qi.idiscount2 idiscount2, qi.isid isid, qi.idescription idescription, 
+                 *     qi.ssid ssid, qi.sname sname, qi.sphone sphone, qi.saddress saddress, qi.sloc_id sloc_id 
+                 *         FROM (SELECT item_id iid, customer_id cid, order_id oid, price, quantity, date FROM joinOrderTable) AS o 
+                 *         INNER JOIN (SELECT i.iid iiid, i.name iname, i.price iprice, i.discount1 idiscount1, i.discount2 idiscount2, i.sid isid, i.description idescription, 
+                 *         s.sid ssid, s.name sname, s.phone sphone, s.address saddress, s.loc_id sloc_id 
+                 *             FROM (SELECT supplier_id sid, name, phone, address, loc_id FROM joinSupplierTable) AS s 
+                 *             RIGHT JOIN (SELECT item_id iid, name, price, discount1, discount2, supplier_id sid, description FROM joinItemTable) AS i 
+                 *             ON i.sid = s.sid) as qi 
+                 *         ON o.iid = qi.iiid) as qo 
+                 *     ON c.cid = qo.ocid 
+                 *     WHERE c.cid <= '0000000005' 
+                 *         AND qo.ooid != '000000000000003' 
+                 *         AND qo.iname != 'T3' 
+                 *     ORDER BY c.cid, qo.iname
+                 */
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + JOIN_CUSTOMER_TABLE_DISPLAY_NAME + " [*] - ['0000000005']\n" +
+                "    SERVER SORTED BY [C.CID, QO.INAME]\n" +
+                "CLIENT MERGE SORT\n" +
+                "    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
+                "    BUILD HASH TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "            SERVER FILTER BY order_id != '000000000000003'\n" +
+                "            PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
+                "            BUILD HASH TABLE 0\n" +
+                "                CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_DISPLAY_NAME + "\n" +
+                "                    SERVER FILTER BY NAME != 'T3'\n" +
+                "                    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
+                "                    BUILD HASH TABLE 0\n" +
+                "                        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME,
                 }});
         testCases.add(new String[][] {
                 {
@@ -515,6 +612,104 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
                 "                    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
                 "                    BUILD HASH TABLE 0\n" +
                 "                        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME,
+                /* 
+                 * testJoinWithSubqueryAndAggregation()
+                 *     SELECT i.name, sum(quantity) FROM joinOrderTable o 
+                 *     LEFT JOIN (SELECT name, item_id iid FROM joinItemTable) AS i 
+                 *     ON o.item_id = i.iid 
+                 *     GROUP BY i.name ORDER BY i.name
+                 */     
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "    SERVER AGGREGATE INTO DISTINCT ROWS BY [I.NAME]\n" +
+                "CLIENT MERGE SORT\n" +
+                "CLIENT SORTED BY [I.NAME]\n" +
+                "    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
+                "    BUILD HASH TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SCHEMA + ".idx_item\n" +
+                "            SERVER FILTER BY FIRST KEY ONLY",
+                /* 
+                 * testJoinWithSubqueryAndAggregation()
+                 *     SELECT o.iid, sum(o.quantity) q 
+                 *     FROM (SELECT item_id iid, quantity FROM joinOrderTable) AS o 
+                 *     LEFT JOIN (SELECT item_id FROM joinItemTable) AS i 
+                 *     ON o.iid = i.item_id 
+                 *     GROUP BY o.iid ORDER BY q DESC                 
+                 */     
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "    SERVER AGGREGATE INTO DISTINCT ROWS BY [O.IID]\n" +
+                "CLIENT MERGE SORT\n" +
+                "CLIENT SORTED BY [SUM(O.QUANTITY) DESC]\n" +
+                "    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
+                "    BUILD HASH TABLE 0 (SKIP MERGE)\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SCHEMA + ".idx_item\n" +
+                "            SERVER FILTER BY FIRST KEY ONLY",
+                /* 
+                 * testJoinWithSubqueryAndAggregation()
+                 *     SELECT i.iid, o.q 
+                 *     FROM (SELECT item_id iid FROM joinItemTable) AS i 
+                 *     LEFT JOIN (SELECT item_id iid, sum(quantity) q FROM joinOrderTable GROUP BY item_id) AS o 
+                 *     ON o.iid = i.iid 
+                 *     ORDER BY o.q DESC NULLS LAST, i.iid
+                 */     
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SCHEMA + ".idx_item\n" +
+                "    SERVER FILTER BY FIRST KEY ONLY\n" +
+                "    SERVER SORTED BY [O.Q DESC NULLS LAST, I.IID]\n" +
+                "CLIENT MERGE SORT\n" +
+                "    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
+                "    BUILD HASH TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "            SERVER AGGREGATE INTO DISTINCT ROWS BY [item_id]\n" +
+                "        CLIENT MERGE SORT",
+                /* 
+                 * testJoinWithSubqueryAndAggregation()
+                 *     SELECT i.iid, o.q 
+                 *     FROM (SELECT item_id iid, sum(quantity) q FROM joinOrderTable GROUP BY item_id) AS o 
+                 *     JOIN (SELECT item_id iid FROM joinItemTable) AS i 
+                 *     ON o.iid = i.iid 
+                 *     ORDER BY o.q DESC, i.iid
+                 */     
+                "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SCHEMA + ".idx_item\n" +
+                "    SERVER FILTER BY FIRST KEY ONLY\n" +
+                "    SERVER SORTED BY [O.Q DESC, I.IID]\n" +
+                "CLIENT MERGE SORT\n" +
+                "    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
+                "    BUILD HASH TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "            SERVER AGGREGATE INTO DISTINCT ROWS BY [item_id]\n" +
+                "        CLIENT MERGE SORT",
+                /*
+                 * testNestedSubqueries()
+                 *     SELECT * FROM (SELECT customer_id cid, name, phone, address, loc_id, date FROM joinCustomerTable) AS c 
+                 *     INNER JOIN (SELECT o.oid ooid, o.cid ocid, o.iid oiid, o.price oprice, o.quantity oquantity, o.date odate, 
+                 *     qi.iiid iiid, qi.iname iname, qi.iprice iprice, qi.idiscount1 idiscount1, qi.idiscount2 idiscount2, qi.isid isid, qi.idescription idescription, 
+                 *     qi.ssid ssid, qi.sname sname, qi.sphone sphone, qi.saddress saddress, qi.sloc_id sloc_id 
+                 *         FROM (SELECT item_id iid, customer_id cid, order_id oid, price, quantity, date FROM joinOrderTable) AS o 
+                 *         INNER JOIN (SELECT i.iid iiid, i.name iname, i.price iprice, i.discount1 idiscount1, i.discount2 idiscount2, i.sid isid, i.description idescription, 
+                 *         s.sid ssid, s.name sname, s.phone sphone, s.address saddress, s.loc_id sloc_id 
+                 *             FROM (SELECT supplier_id sid, name, phone, address, loc_id FROM joinSupplierTable) AS s 
+                 *             RIGHT JOIN (SELECT item_id iid, name, price, discount1, discount2, supplier_id sid, description FROM joinItemTable) AS i 
+                 *             ON i.sid = s.sid) as qi 
+                 *         ON o.iid = qi.iiid) as qo 
+                 *     ON c.cid = qo.ocid 
+                 *     WHERE c.cid <= '0000000005' 
+                 *         AND qo.ooid != '000000000000003' 
+                 *         AND qo.iname != 'T3' 
+                 *     ORDER BY c.cid, qo.iname
+                 */
+                "CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + JOIN_CUSTOMER_TABLE_DISPLAY_NAME + " [*] - ['0000000005']\n" +
+                "    SERVER SORTED BY [C.CID, QO.INAME]\n" +
+                "CLIENT MERGE SORT\n" +
+                "    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
+                "    BUILD HASH TABLE 0\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "            SERVER FILTER BY order_id != '000000000000003'\n" +
+                "            PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
+                "            BUILD HASH TABLE 0\n" +
+                "                CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SCHEMA + ".idx_item\n" +
+                "                    SERVER FILTER BY NAME != 'T3'\n" +
+                "                    PARALLEL EQUI-JOIN 1 HASH TABLES:\n" +
+                "                    BUILD HASH TABLE 0\n" +
+                "                        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME,
                 }});
         return testCases;
     }
@@ -2529,6 +2724,300 @@ public class HashJoinIT extends BaseHBaseManagedTimeIT {
             conn.close();
         }
     }
+    
+    @Test
+    public void testJoinWithSubquery() throws Exception {
+        String query1 = "SELECT item.\"item_id\", item.name, supp.sid, supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item INNER JOIN (SELECT \"supplier_id\" sid, name FROM " + JOIN_SUPPLIER_TABLE_FULL_NAME + " WHERE name BETWEEN 'S1' AND 'S5') AS supp ON item.\"supplier_id\" = supp.sid";
+        String query2 = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item INNER JOIN (SELECT \"supplier_id\", name FROM " + JOIN_SUPPLIER_TABLE_FULL_NAME + ") AS supp ON item.\"supplier_id\" = supp.\"supplier_id\" AND (supp.name = 'S1' OR supp.name = 'S5')";
+        Properties props = new Properties(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query1);
+            ResultSet rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000001");
+            assertEquals(rs.getString(2), "T1");
+            assertEquals(rs.getString(3), "0000000001");
+            assertEquals(rs.getString(4), "S1");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000002");
+            assertEquals(rs.getString(2), "T2");
+            assertEquals(rs.getString(3), "0000000001");
+            assertEquals(rs.getString(4), "S1");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000003");
+            assertEquals(rs.getString(2), "T3");
+            assertEquals(rs.getString(3), "0000000002");
+            assertEquals(rs.getString(4), "S2");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000004");
+            assertEquals(rs.getString(2), "T4");
+            assertEquals(rs.getString(3), "0000000002");
+            assertEquals(rs.getString(4), "S2");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000005");
+            assertEquals(rs.getString(2), "T5");
+            assertEquals(rs.getString(3), "0000000005");
+            assertEquals(rs.getString(4), "S5");
+
+            assertFalse(rs.next());
+            
+            
+            statement = conn.prepareStatement(query2);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000001");
+            assertEquals(rs.getString(2), "T1");
+            assertEquals(rs.getString(3), "0000000001");
+            assertEquals(rs.getString(4), "S1");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000002");
+            assertEquals(rs.getString(2), "T2");
+            assertEquals(rs.getString(3), "0000000001");
+            assertEquals(rs.getString(4), "S1");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000005");
+            assertEquals(rs.getString(2), "T5");
+            assertEquals(rs.getString(3), "0000000005");
+            assertEquals(rs.getString(4), "S5");
+
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testJoinWithSubqueryAndAggregation() throws Exception {
+        String query1 = "SELECT i.name, sum(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o LEFT JOIN (SELECT name, \"item_id\" iid FROM " 
+            + JOIN_ITEM_TABLE_FULL_NAME + ") AS i ON o.\"item_id\" = i.iid GROUP BY i.name ORDER BY i.name";
+        String query2 = "SELECT o.iid, sum(o.quantity) q FROM (SELECT \"item_id\" iid, quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + ") AS o LEFT JOIN (SELECT \"item_id\" FROM " 
+                + JOIN_ITEM_TABLE_FULL_NAME + ") AS i ON o.iid = i.\"item_id\" GROUP BY o.iid ORDER BY q DESC";
+        String query3 = "SELECT i.iid, o.q FROM (SELECT \"item_id\" iid FROM " + JOIN_ITEM_TABLE_FULL_NAME + ") AS i LEFT JOIN (SELECT \"item_id\" iid, sum(quantity) q FROM " 
+                + JOIN_ORDER_TABLE_FULL_NAME + " GROUP BY \"item_id\") AS o ON o.iid = i.iid ORDER BY o.q DESC NULLS LAST, i.iid";
+        String query4 = "SELECT i.iid, o.q FROM (SELECT \"item_id\" iid, sum(quantity) q FROM " + JOIN_ORDER_TABLE_FULL_NAME + " GROUP BY \"item_id\") AS o JOIN (SELECT \"item_id\" iid FROM " 
+                + JOIN_ITEM_TABLE_FULL_NAME + ") AS i ON o.iid = i.iid ORDER BY o.q DESC, i.iid";
+        Properties props = new Properties(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query1);
+            ResultSet rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "T1");
+            assertEquals(rs.getInt(2), 1000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "T2");
+            assertEquals(rs.getInt(2), 3000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "T3");
+            assertEquals(rs.getInt(2), 5000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "T6");
+            assertEquals(rs.getInt(2), 6000);
+
+            assertFalse(rs.next());
+            
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query1);
+            assertEquals(plans[14], QueryUtil.getExplainPlan(rs));
+            
+            statement = conn.prepareStatement(query2);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString("o.iid"), "0000000006");
+            assertEquals(rs.getInt("q"), 6000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString("o.iid"), "0000000003");
+            assertEquals(rs.getInt("q"), 5000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString("o.iid"), "0000000002");
+            assertEquals(rs.getInt("q"), 3000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString("o.iid"), "0000000001");
+            assertEquals(rs.getInt("q"), 1000);
+
+            assertFalse(rs.next());
+            
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query2);
+            assertEquals(plans[15], QueryUtil.getExplainPlan(rs));
+            
+            statement = conn.prepareStatement(query3);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString("i.iid"), "0000000006");
+            assertEquals(rs.getInt("o.q"), 6000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString("i.iid"), "0000000003");
+            assertEquals(rs.getInt("o.q"), 5000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString("i.iid"), "0000000002");
+            assertEquals(rs.getInt("o.q"), 3000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString("i.iid"), "0000000001");
+            assertEquals(rs.getInt("o.q"), 1000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString("i.iid"), "0000000004");
+            assertEquals(rs.getInt("o.q"), 0);
+            assertTrue (rs.next());
+            assertEquals(rs.getString("i.iid"), "0000000005");
+            assertEquals(rs.getInt("o.q"), 0);
+            assertTrue (rs.next());
+            assertEquals(rs.getString("i.iid"), "invalid001");
+            assertEquals(rs.getInt("o.q"), 0);
+
+            assertFalse(rs.next());
+            
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query3);
+            assertEquals(plans[16], QueryUtil.getExplainPlan(rs));
+            
+            statement = conn.prepareStatement(query4);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString("i.iid"), "0000000006");
+            assertEquals(rs.getInt("o.q"), 6000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString("i.iid"), "0000000003");
+            assertEquals(rs.getInt("o.q"), 5000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString("i.iid"), "0000000002");
+            assertEquals(rs.getInt("o.q"), 3000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString("i.iid"), "0000000001");
+            assertEquals(rs.getInt("o.q"), 1000);
+
+            assertFalse(rs.next());
+            
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query4);
+            assertEquals(plans[17], QueryUtil.getExplainPlan(rs));
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testNestedSubqueries() throws Exception {
+        String query1 = "SELECT q.iname, count(c.name), min(q.sname), max(o.quantity) FROM (SELECT \"customer_id\" cid, \"item_id\" iid, quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + ") AS o LEFT JOIN " 
+                + "(SELECT i.iid iid, s.name sname, i.name iname FROM (SELECT \"supplier_id\" sid, name FROM " + JOIN_SUPPLIER_TABLE_FULL_NAME + ") AS s RIGHT JOIN (SELECT \"item_id\" iid, name, \"supplier_id\" sid FROM " + JOIN_ITEM_TABLE_FULL_NAME + ") AS i ON i.sid = s.sid) AS q" 
+                + " ON o.iid = q.iid LEFT JOIN (SELECT \"customer_id\" cid, name FROM " 
+                + JOIN_CUSTOMER_TABLE_FULL_NAME + ") AS c ON c.cid = o.cid GROUP BY q.iname ORDER BY q.iname";
+        String query2 = "SELECT * FROM (SELECT \"customer_id\" cid, name, phone, address, loc_id, date FROM " + JOIN_CUSTOMER_TABLE_FULL_NAME + ") AS c INNER JOIN " 
+                + "(SELECT o.oid ooid, o.cid ocid, o.iid oiid, o.price oprice, o.quantity oquantity, o.date odate, qi.iiid iiid, qi.iname iname, qi.iprice iprice, qi.idiscount1 idiscount1, qi.idiscount2 idiscount2, qi.isid isid, qi.idescription idescription, qi.ssid ssid, qi.sname sname, qi.sphone sphone, qi.saddress saddress, qi.sloc_id sloc_id FROM (SELECT \"item_id\" iid, \"customer_id\" cid, \"order_id\" oid, price, quantity, date FROM " + JOIN_ORDER_TABLE_FULL_NAME + ") AS o INNER JOIN " 
+                + "(SELECT i.iid iiid, i.name iname, i.price iprice, i.discount1 idiscount1, i.discount2 idiscount2, i.sid isid, i.description idescription, s.sid ssid, s.name sname, s.phone sphone, s.address saddress, s.loc_id sloc_id FROM (SELECT \"supplier_id\" sid, name, phone, address, loc_id FROM " + JOIN_SUPPLIER_TABLE_FULL_NAME + ") AS s RIGHT JOIN (SELECT \"item_id\" iid, name, price, discount1, discount2, \"supplier_id\" sid, description FROM " + JOIN_ITEM_TABLE_FULL_NAME + ") AS i ON i.sid = s.sid) as qi" 
+                + " ON o.iid = qi.iiid) as qo ON c.cid = qo.ocid" 
+                + " WHERE c.cid <= '0000000005' AND qo.ooid != '000000000000003' AND qo.iname != 'T3' ORDER BY c.cid, qo.iname";
+        Properties props = new Properties(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query1);
+            ResultSet rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "T1");
+            assertEquals(rs.getInt(2), 1);
+            assertEquals(rs.getString(3), "S1");
+            assertEquals(rs.getInt(4), 1000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "T2");
+            assertEquals(rs.getInt(2), 1);
+            assertEquals(rs.getString(3), "S1");
+            assertEquals(rs.getInt(4), 3000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "T3");
+            assertEquals(rs.getInt(2), 1);
+            assertEquals(rs.getString(3), "S2");
+            assertEquals(rs.getInt(4), 5000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "T6");
+            assertEquals(rs.getInt(2), 2);
+            assertEquals(rs.getString(3), "S6");
+            assertEquals(rs.getInt(4), 4000);
+
+            assertFalse(rs.next());
+            
+            statement = conn.prepareStatement(query2);
+            rs = statement.executeQuery();
+            assertTrue(rs.next());
+            assertEquals(rs.getString("c.cid"), "0000000003");
+            assertEquals(rs.getString("c.name"), "C3");
+            assertEquals(rs.getString("c.phone"), "999-999-3333");
+            assertEquals(rs.getString("c.address"), "303 XXX Street");
+            assertNull(rs.getString("c.loc_id"));
+            assertEquals(rs.getDate("c.date"), new Date(format.parse("2013-11-25 10:06:29").getTime()));
+            assertEquals(rs.getString("qo.ooid"), "000000000000002");
+            assertEquals(rs.getString("qo.ocid"), "0000000003");
+            assertEquals(rs.getString("qo.oiid"), "0000000006");
+            assertEquals(rs.getInt("qo.oprice"), 552);
+            assertEquals(rs.getInt("qo.oquantity"), 2000);
+            assertEquals(rs.getTimestamp("qo.odate"), new Timestamp(format.parse("2013-11-25 10:06:29").getTime()));
+            assertEquals(rs.getString("qo.iiid"), "0000000006");
+            assertEquals(rs.getString("qo.iname"), "T6");
+            assertEquals(rs.getInt("qo.iprice"), 600);
+            assertEquals(rs.getInt("qo.idiscount1"), 8);
+            assertEquals(rs.getInt("qo.idiscount2"), 15);
+            assertEquals(rs.getString("qo.isid"), "0000000006");
+            assertEquals(rs.getString("qo.idescription"), "Item T6");
+            assertEquals(rs.getString("qo.ssid"), "0000000006");
+            assertEquals(rs.getString("qo.sname"), "S6");
+            assertEquals(rs.getString("qo.sphone"), "888-888-6666");
+            assertEquals(rs.getString("qo.saddress"), "606 YYY Street");
+            assertEquals(rs.getString("qo.sloc_id"), "10006");
+            assertTrue(rs.next());
+            assertEquals(rs.getString("c.cid"), "0000000004");
+            assertEquals(rs.getString("c.name"), "C4");
+            assertEquals(rs.getString("c.phone"), "999-999-4444");
+            assertEquals(rs.getString("c.address"), "404 XXX Street");
+            assertEquals(rs.getString("c.loc_id"), "10004");
+            assertEquals(rs.getDate("c.date"), new Date(format.parse("2013-11-22 14:22:56").getTime()));
+            assertEquals(rs.getString("qo.ooid"), "000000000000001");
+            assertEquals(rs.getString("qo.ocid"), "0000000004");
+            assertEquals(rs.getString("qo.oiid"), "0000000001");
+            assertEquals(rs.getInt("qo.oprice"), 100);
+            assertEquals(rs.getInt("qo.oquantity"), 1000);
+            assertEquals(rs.getTimestamp("qo.odate"), new Timestamp(format.parse("2013-11-22 14:22:56").getTime()));
+            assertEquals(rs.getString("qo.iiid"), "0000000001");
+            assertEquals(rs.getString("qo.iname"), "T1");
+            assertEquals(rs.getInt("qo.iprice"), 100);
+            assertEquals(rs.getInt("qo.idiscount1"), 5);
+            assertEquals(rs.getInt("qo.idiscount2"), 10);
+            assertEquals(rs.getString("qo.isid"), "0000000001");
+            assertEquals(rs.getString("qo.idescription"), "Item T1");
+            assertEquals(rs.getString("qo.ssid"), "0000000001");
+            assertEquals(rs.getString("qo.sname"), "S1");
+            assertEquals(rs.getString("qo.sphone"), "888-888-1111");
+            assertEquals(rs.getString("qo.saddress"), "101 YYY Street");
+            assertEquals(rs.getString("qo.sloc_id"), "10001");
+            assertTrue(rs.next());
+            assertEquals(rs.getString("c.cid"), "0000000004");
+            assertEquals(rs.getString("c.name"), "C4");
+            assertEquals(rs.getString("c.phone"), "999-999-4444");
+            assertEquals(rs.getString("c.address"), "404 XXX Street");
+            assertEquals(rs.getString("c.loc_id"), "10004");
+            assertEquals(rs.getDate("c.date"), new Date(format.parse("2013-11-22 14:22:56").getTime()));
+            assertEquals(rs.getString("qo.ooid"), "000000000000004");
+            assertEquals(rs.getString("qo.ocid"), "0000000004");
+            assertEquals(rs.getString("qo.oiid"), "0000000006");
+            assertEquals(rs.getInt("qo.oprice"), 510);
+            assertEquals(rs.getInt("qo.oquantity"), 4000);
+            assertEquals(rs.getTimestamp("qo.odate"), new Timestamp(format.parse("2013-11-26 13:26:04").getTime()));
+            assertEquals(rs.getString("qo.iiid"), "0000000006");
+            assertEquals(rs.getString("qo.iname"), "T6");
+            assertEquals(rs.getInt("qo.iprice"), 600);
+            assertEquals(rs.getInt("qo.idiscount1"), 8);
+            assertEquals(rs.getInt("qo.idiscount2"), 15);
+            assertEquals(rs.getString("qo.isid"), "0000000006");
+            assertEquals(rs.getString("qo.idescription"), "Item T6");
+            assertEquals(rs.getString("qo.ssid"), "0000000006");
+            assertEquals(rs.getString("qo.sname"), "S6");
+            assertEquals(rs.getString("qo.sphone"), "888-888-6666");
+            assertEquals(rs.getString("qo.saddress"), "606 YYY Street");
+            assertEquals(rs.getString("qo.sloc_id"), "10006");
+
+            assertFalse(rs.next());            
+            
+            rs = conn.createStatement().executeQuery("EXPLAIN " + query2);
+            assertEquals(plans[18], QueryUtil.getExplainPlan(rs));
+        } finally {
+            conn.close();
+        }
+    }
 
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/16e74c68/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index 500ae5f..b0e28b4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -26,19 +26,25 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.parse.AliasedNode;
 import org.apache.phoenix.parse.BindTableNode;
 import org.apache.phoenix.parse.ColumnDef;
 import org.apache.phoenix.parse.CreateTableStatement;
 import org.apache.phoenix.parse.DerivedTableNode;
+import org.apache.phoenix.parse.FamilyWildcardParseNode;
 import org.apache.phoenix.parse.JoinTableNode;
 import org.apache.phoenix.parse.NamedTableNode;
+import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.parse.SingleTableStatement;
 import org.apache.phoenix.parse.TableName;
 import org.apache.phoenix.parse.TableNode;
 import org.apache.phoenix.parse.TableNodeVisitor;
+import org.apache.phoenix.parse.TableWildcardParseNode;
+import org.apache.phoenix.parse.WildcardParseNode;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.AmbiguousColumnException;
@@ -57,6 +63,7 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.SchemaUtil;
@@ -385,7 +392,37 @@ public class FromCompiler {
 
         @Override
         public Void visit(DerivedTableNode subselectNode) throws SQLException {
-            throw new SQLFeatureNotSupportedException();
+            List<AliasedNode> selectNodes = subselectNode.getSelect().getSelect();
+            List<PColumn> columns = new ArrayList<PColumn>();
+            int position = 0;
+            for (AliasedNode aliasedNode : selectNodes) {
+                String alias = aliasedNode.getAlias();
+                if (alias == null) {
+                    ParseNode node = aliasedNode.getNode();
+                    if (node instanceof WildcardParseNode 
+                            || node instanceof TableWildcardParseNode
+                            || node instanceof FamilyWildcardParseNode)
+                        throw new SQLException("Encountered wildcard in subqueries.");
+                    
+                    alias = SchemaUtil.normalizeIdentifier(node.getAlias());
+                }
+                if (alias != null) {
+                    PColumnImpl column = new PColumnImpl(PNameFactory.newName(alias), 
+                            PNameFactory.newName(QueryConstants.DEFAULT_COLUMN_FAMILY), 
+                            null, 0, 0, true, position++, SortOrder.ASC, null, null, false);
+                    columns.add(column);
+                }
+            }
+            PTable t = PTableImpl.makePTable(null, PName.EMPTY_NAME, PName.EMPTY_NAME, 
+                    PTableType.SUBQUERY, null, MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM, 
+                    null, null, columns, null, Collections.<PTable>emptyList(), false, 
+                    Collections.<PName>emptyList(), null, null, false, false, null, null);
+            
+            String alias = subselectNode.getAlias();
+            TableRef tableRef = new TableRef(alias, t, MetaDataProtocol.MIN_TABLE_TIMESTAMP, false);
+            tableMap.put(alias, tableRef);
+            tables.add(tableRef);
+            return null;
         }
 
         private static class ColumnFamilyRef {

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/16e74c68/phoenix-core/src/main/java/org/apache/phoenix/compile/IndexStatementRewriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/IndexStatementRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/IndexStatementRewriter.java
index 68a1218..858c7ca 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/IndexStatementRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/IndexStatementRewriter.java
@@ -73,6 +73,11 @@ public class IndexStatementRewriter extends ParseNodeRewriter {
     public static SelectStatement translate(SelectStatement statement, ColumnResolver dataResolver, Map<TableRef, TableRef> multiTableRewriteMap) throws SQLException {
         return rewrite(statement, new IndexStatementRewriter(dataResolver, multiTableRewriteMap));
     }
+    
+    @Override
+    protected boolean visitDerivedTableNode() {
+        return false;
+    }
 
     @Override
     public ParseNode visit(ColumnParseNode node) throws SQLException {

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/16e74c68/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index 6136e4e..0293ee0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -40,7 +40,7 @@ import org.apache.phoenix.expression.CoerceExpression;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.function.CountAggregateFunction;
 import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.join.ScanProjector;
+import org.apache.phoenix.join.TupleProjector;
 import org.apache.phoenix.parse.AliasedNode;
 import org.apache.phoenix.parse.AndParseNode;
 import org.apache.phoenix.parse.BetweenParseNode;
@@ -66,6 +66,7 @@ import org.apache.phoenix.parse.OrParseNode;
 import org.apache.phoenix.parse.OrderByNode;
 import org.apache.phoenix.parse.ParseNode;
 import org.apache.phoenix.parse.ParseNodeFactory;
+import org.apache.phoenix.parse.ParseNodeRewriter;
 import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.parse.StatelessTraverseAllParseNodeVisitor;
 import org.apache.phoenix.parse.TableName;
@@ -210,7 +211,10 @@ public class JoinCompiler {
         @Override
         public Pair<Table, List<JoinSpec>> visit(DerivedTableNode subselectNode)
                 throws SQLException {
-            throw new SQLFeatureNotSupportedException();
+            TableRef tableRef = resolveTable(subselectNode.getAlias(), null);
+            List<AliasedNode> selectNodes = extractFromSelect(statement.getSelect(), tableRef, origResolver);
+            Table table = new Table(subselectNode, selectNodes, tableRef);
+            return new Pair<Table, List<JoinSpec>>(table, null);
         }
     }
     
@@ -276,6 +280,10 @@ public class JoinCompiler {
             return tableRefs;
         }
         
+        public SelectStatement getStatement() {
+            return statement;
+        }
+        
         public ColumnResolver getOriginalResolver() {
             return origResolver;
         }
@@ -286,11 +294,11 @@ public class JoinCompiler {
         
         public void addFilter(ParseNode filter) throws SQLException {
             if (joinSpecs.isEmpty()) {
-                table.getPreFilters().add(filter);
+                table.addFilter(filter);
                 return;
             }
             
-            WhereNodeVisitor visitor = new WhereNodeVisitor(origResolver, table.getPreFilters(),
+            WhereNodeVisitor visitor = new WhereNodeVisitor(origResolver, table,
                     postFilters, Collections.<TableRef>singletonList(table.getTableRef()), 
                     hasRightJoin, prefilterAcceptedTables);
             filter.accept(visitor);
@@ -321,7 +329,7 @@ public class JoinCompiler {
         }
         
         public Expression compilePostFilterExpression(StatementContext context) throws SQLException {
-            if (postFilters == null || postFilters.isEmpty())
+            if (postFilters.isEmpty())
                 return null;
             
             ExpressionCompiler expressionCompiler = new ExpressionCompiler(context);
@@ -347,9 +355,10 @@ public class JoinCompiler {
          */
         public boolean[] getStarJoinVector() {
             int count = joinSpecs.size();
-            if (!useStarJoin 
-                    && count > 1 
-                    && joinSpecs.get(count - 1).getType() != JoinType.Left)
+            if (!table.isFlat() ||
+                    (!useStarJoin 
+                            && count > 1 
+                            && joinSpecs.get(count - 1).getType() != JoinType.Left))
                 return null;
 
             boolean[] vector = new boolean[count];
@@ -376,11 +385,14 @@ public class JoinCompiler {
                 new JoinTable(table);
         }
         
-        public SelectStatement getSubqueryWithoutJoin(boolean asSubquery) {
+        public SelectStatement getAsSingleSubquery(SelectStatement query, boolean asSubquery) throws SQLException {
+            if (!isFlat(query))
+                throw new SQLFeatureNotSupportedException("Complex subqueries not supported as left join table.");
+            
             if (asSubquery)
-                return table.getAsSubquery();
+                return query;
             
-            return NODE_FACTORY.select(Collections.<TableNode>singletonList(table.getTableNode()), statement.getHint(), statement.isDistinct(), statement.getSelect(), table.getPreFiltersCombined(), statement.getGroupBy(), statement.getHaving(), statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate());
+            return NODE_FACTORY.select(query.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(), query.getWhere(), statement.getGroupBy(), statement.getHaving(), statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate());            
         }
         
         public boolean hasPostReference() {
@@ -564,17 +576,32 @@ public class JoinCompiler {
     public class Table {
         private final TableNode tableNode;
         private final List<ColumnDef> dynamicColumns;
+        private final SelectStatement subselect;
         private final TableRef tableRef;
         private final List<AliasedNode> selectNodes; // all basic nodes related to this table, no aggregation.
         private final List<ParseNode> preFilters;
+        private final List<ParseNode> postFilters;
         
         private Table(TableNode tableNode, List<ColumnDef> dynamicColumns, 
                 List<AliasedNode> selectNodes, TableRef tableRef) {
             this.tableNode = tableNode;
             this.dynamicColumns = dynamicColumns;
+            this.subselect = null;
             this.tableRef = tableRef;
             this.selectNodes = selectNodes;
             this.preFilters = new ArrayList<ParseNode>();
+            this.postFilters = Collections.<ParseNode>emptyList();
+        }
+        
+        private Table(DerivedTableNode tableNode, 
+                List<AliasedNode> selectNodes, TableRef tableRef) {
+            this.tableNode = tableNode;
+            this.dynamicColumns = Collections.<ColumnDef>emptyList();
+            this.subselect = tableNode.getSelect();
+            this.tableRef = tableRef;
+            this.selectNodes = selectNodes;
+            this.preFilters = Collections.<ParseNode>emptyList();
+            this.postFilters = new ArrayList<ParseNode>();
         }
         
         public TableNode getTableNode() {
@@ -585,6 +612,10 @@ public class JoinCompiler {
             return dynamicColumns;
         }
         
+        public boolean isSubselect() {
+            return subselect != null;
+        }
+        
         public List<AliasedNode> getSelectNodes() {
             return selectNodes;
         }
@@ -593,31 +624,45 @@ public class JoinCompiler {
             return preFilters;
         }
         
+        public List<ParseNode> getPostFilters() {
+            return postFilters;
+        }
+        
         public TableRef getTableRef() {
             return tableRef;
         }
         
-        public ParseNode getPreFiltersCombined() {
-            if (preFilters == null || preFilters.isEmpty())
-                return null;
-            
-            if (preFilters.size() == 1)
-                return preFilters.get(0);
+        public void addFilter(ParseNode filter) {
+            if (!isSubselect()) {
+                preFilters.add(filter);
+                return;
+            }
             
-            return NODE_FACTORY.and(preFilters);
+            postFilters.add(filter);
+        }
+        
+        public ParseNode getPreFiltersCombined() {
+            return combine(preFilters);
         }
         
-        public SelectStatement getAsSubquery() {
-            // TODO handle DerivedTableNode differently?
+        public SelectStatement getAsSubquery() throws SQLException {
+            if (isSubselect())
+                return SubqueryColumnAliasRewriter.applyPostFilters(subselect, postFilters, tableNode.getAlias());
+            
             List<TableNode> from = Collections.<TableNode>singletonList(tableNode);
             return NODE_FACTORY.select(from, statement.getHint(), false, selectNodes, getPreFiltersCombined(), null, null, null, null, 0, false);
         }
         
+        public boolean isFlat() {
+            return subselect == null || JoinCompiler.isFlat(subselect);
+        }
+        
         protected boolean isWildCardSelect() {
             return (selectNodes.size() == 1 && selectNodes.get(0).getNode() instanceof TableWildcardParseNode);
         }
 
         public void projectColumns(Scan scan) {
+            assert(!isSubselect());
             if (isWildCardSelect()) {
                 scan.getFamilyMap().clear();
                 return;
@@ -631,6 +676,7 @@ public class JoinCompiler {
         }
         
         public ProjectedPTableWrapper createProjectedTable(boolean retainPKColumns) throws SQLException {
+            assert(!isSubselect());
             List<PColumn> projectedColumns = new ArrayList<PColumn>();
             List<Expression> sourceExpressions = new ArrayList<Expression>();
             ListMultimap<String, String> columnNameMap = ArrayListMultimap.<String, String>create();
@@ -646,7 +692,7 @@ public class JoinCompiler {
                 for (PColumn column : table.getColumns()) {
                     if (!retainPKColumns || !SchemaUtil.isPKColumn(column)) {
                         addProjectedColumn(projectedColumns, sourceExpressions, columnNameMap,
-                                column, PNameFactory.newName(ScanProjector.VALUE_COLUMN_FAMILY), hasSaltingColumn);
+                                column, PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), hasSaltingColumn);
                     }
                 }
             } else {
@@ -657,7 +703,7 @@ public class JoinCompiler {
                             && (!retainPKColumns || !SchemaUtil.isPKColumn(columnRef.getColumn()))) {
                         PColumn column = columnRef.getColumn();
                         addProjectedColumn(projectedColumns, sourceExpressions, columnNameMap,
-                                column, PNameFactory.newName(ScanProjector.VALUE_COLUMN_FAMILY), hasSaltingColumn);
+                                column, PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), hasSaltingColumn);
                     }
                 }               
             }
@@ -696,6 +742,28 @@ public class JoinCompiler {
             projectedColumns.add(column);
             sourceExpressions.add(sourceExpression);
         }
+        
+        public ProjectedPTableWrapper createProjectedTable(RowProjector rowProjector) throws SQLException {
+            assert(isSubselect());
+            List<PColumn> projectedColumns = new ArrayList<PColumn>();
+            List<Expression> sourceExpressions = new ArrayList<Expression>();
+            ListMultimap<String, String> columnNameMap = ArrayListMultimap.<String, String>create();
+            PTable table = tableRef.getTable();
+            for (PColumn column : table.getColumns()) {
+                String colName = getProjectedColumnName(null, tableRef.getTableAlias(), column.getName().getString());
+                Expression sourceExpression = rowProjector.getColumnProjector(column.getPosition()).getExpression();
+                PColumnImpl projectedColumn = new PColumnImpl(PNameFactory.newName(colName), PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), 
+                        sourceExpression.getDataType(), sourceExpression.getMaxLength(), sourceExpression.getScale(), sourceExpression.isNullable(), 
+                        column.getPosition(), sourceExpression.getSortOrder(), column.getArraySize(), column.getViewConstant(), column.isViewReferenced());                
+                projectedColumns.add(projectedColumn);
+                sourceExpressions.add(sourceExpression);
+            }
+            PTable t = PTableImpl.makePTable(table.getTenantId(), PNameFactory.newName(PROJECTED_TABLE_SCHEMA), table.getName(), PTableType.JOIN,
+                        table.getIndexState(), table.getTimeStamp(), table.getSequenceNumber(), table.getPKName(),
+                        null, projectedColumns, table.getParentTableName(),
+                        table.getIndexes(), table.isImmutableRows(), Collections.<PName>emptyList(), null, null, table.isWALDisabled(), table.isMultiTenant(), table.getViewType(), table.getViewIndexId());
+            return new ProjectedPTableWrapper(t, columnNameMap, sourceExpressions);
+        }
     }
     
     private static abstract class ConditionNodeVisitor extends TraverseNoParseNodeVisitor<Void> {
@@ -770,17 +838,17 @@ public class JoinCompiler {
     
     private static class WhereNodeVisitor extends ConditionNodeVisitor {
         private ColumnResolver resolver;
-        private List<ParseNode> preFilters;
+        private Table table;
         private List<ParseNode> postFilters;
         private List<TableRef> selfTableRefs;
         private boolean hasRightJoin;
         private List<JoinTable> prefilterAcceptedTables;
         
-        public WhereNodeVisitor(ColumnResolver resolver, List<ParseNode> preFilters,
+        public WhereNodeVisitor(ColumnResolver resolver, Table table,
                 List<ParseNode> postFilters, List<TableRef> selfTableRefs, boolean hasRightJoin, 
                 List<JoinTable> prefilterAcceptedTables) {
             this.resolver = resolver;
-            this.preFilters = preFilters;
+            this.table = table;
             this.postFilters = postFilters;
             this.selfTableRefs = selfTableRefs;
             this.hasRightJoin = hasRightJoin;
@@ -796,7 +864,7 @@ public class JoinCompiler {
             case NONE:
             case SELF_ONLY:
                 if (!hasRightJoin) {
-                    preFilters.add(node);
+                    table.addFilter(node);
                 } else {
                     postFilters.add(node);
                 }
@@ -956,10 +1024,85 @@ public class JoinCompiler {
         }
     }
     
+    private static class SubqueryColumnAliasRewriter extends ParseNodeRewriter {
+        List<AliasedNode> effectiveSelectNodes = new ArrayList<AliasedNode>();
+        Map<String, ParseNode> aliasMap = new HashMap<String, ParseNode>();
+        
+        public static SelectStatement applyPostFilters(SelectStatement statement, List<ParseNode> postFilters, String subqueryAlias) throws SQLException {
+            if (postFilters.isEmpty())
+                return statement;
+            
+            SubqueryColumnAliasRewriter rewriter = new SubqueryColumnAliasRewriter(statement.getSelect(), subqueryAlias);
+            List<ParseNode> postFiltersRewrite = new ArrayList<ParseNode>(postFilters.size());
+            for (ParseNode node : postFilters) {
+                postFiltersRewrite.add(node.accept(rewriter));
+            }
+            
+            if (statement.getGroupBy().isEmpty()) {
+                ParseNode where = statement.getWhere();
+                if (where != null) {
+                    postFiltersRewrite.add(where);
+                }
+                return NODE_FACTORY.select(statement.getFrom(), statement.getHint(), statement.isDistinct(), rewriter.effectiveSelectNodes, combine(postFiltersRewrite), statement.getGroupBy(), statement.getHaving(), statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate());
+            }
+            
+            ParseNode having = statement.getHaving();
+            if (having != null) {
+                postFiltersRewrite.add(having);
+            }
+            return NODE_FACTORY.select(statement.getFrom(), statement.getHint(), statement.isDistinct(), rewriter.effectiveSelectNodes, statement.getWhere(), statement.getGroupBy(), combine(postFiltersRewrite), statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate());
+        }
+        
+        private SubqueryColumnAliasRewriter(List<AliasedNode> aliasedNodes, String subqueryAlias) {
+            for (AliasedNode aliasedNode : aliasedNodes) {
+                String alias = aliasedNode.getAlias();
+                ParseNode node = aliasedNode.getNode();
+                if (alias == null) {
+                    alias = SchemaUtil.normalizeIdentifier(node.getAlias());
+                }
+                if (alias != null) {
+                    effectiveSelectNodes.add(aliasedNode);
+                    aliasMap.put(SchemaUtil.getColumnName(subqueryAlias, alias), node);
+                }
+            }
+        }
+        
+        
+        @Override
+        public ParseNode visit(ColumnParseNode node) throws SQLException {
+            if (node.getTableName() != null) {
+                ParseNode aliasedNode = aliasMap.get(node.getFullName());
+                if (aliasedNode != null) {
+                    return aliasedNode;
+                }
+            }
+            return node;
+        }        
+    }
+    
     private static String PROJECTED_TABLE_SCHEMA = ".";
     // for creation of new statements
     private static ParseNodeFactory NODE_FACTORY = new ParseNodeFactory();
     
+    private static boolean isFlat(SelectStatement select) {
+        // TODO flatten single nested query in normalization
+        return !select.isJoin() 
+                && !select.isAggregate() 
+                && !select.isDistinct() 
+                && !(select.getFrom().get(0) instanceof DerivedTableNode)
+                && select.getLimit() == null;
+    }
+    
+    private static ParseNode combine(List<ParseNode> nodes) {
+        if (nodes.isEmpty())
+            return null;
+        
+        if (nodes.size() == 1)
+            return nodes.get(0);
+        
+        return NODE_FACTORY.and(nodes);
+    }
+    
     private static List<AliasedNode> extractFromSelect(List<AliasedNode> select, TableRef tableRef, ColumnResolver resolver) throws SQLException {
         List<AliasedNode> ret = new ArrayList<AliasedNode>();
         ColumnParseNodeVisitor visitor = new ColumnParseNodeVisitor(resolver);
@@ -1032,6 +1175,8 @@ public class JoinCompiler {
         final Map<TableRef, TableRef> replacement = new HashMap<TableRef, TableRef>();
         
         for (Table table : join.getTables()) {
+            if (table.isSubselect())
+                continue;
             TableRef tableRef = table.getTableRef();
             List<ParseNode> groupBy = tableRef.equals(groupByTableRef) ? select.getGroupBy() : null;
             List<OrderByNode> orderBy = tableRef.equals(orderByTableRef) ? select.getOrderBy() : null;
@@ -1099,7 +1244,7 @@ public class JoinCompiler {
                 @Override
                 public TableNode visit(DerivedTableNode subselectNode)
                         throws SQLException {
-                    throw new SQLFeatureNotSupportedException();
+                    return subselectNode;
                 }
             }));
         }
@@ -1130,10 +1275,6 @@ public class JoinCompiler {
         return NODE_FACTORY.select(from, hintNode, false, selectList, where, groupBy, null, orderBy, null, 0, false);
     }
     
-    public static ScanProjector getScanProjector(ProjectedPTableWrapper table) {
-    	return new ScanProjector(table);
-    }
-    
     public class PTableWrapper {
     	protected PTable table;
     	protected ListMultimap<String, String> columnNameMap;
@@ -1168,7 +1309,7 @@ public class JoinCompiler {
             for (PColumn c : right.getColumns()) {
                 if (!SchemaUtil.isPKColumn(c)) {
                     PColumnImpl column = new PColumnImpl(c.getName(), 
-                            PNameFactory.newName(ScanProjector.VALUE_COLUMN_FAMILY), c.getDataType(), 
+                            PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), c.getDataType(), 
                             c.getMaxLength(), c.getScale(), innerJoin ? c.isNullable() : true, position++, 
                             c.getSortOrder(), c.getArraySize(), c.getViewConstant(), c.isViewReferenced());
                     merged.add(column);
@@ -1200,6 +1341,10 @@ public class JoinCompiler {
     	public Expression getSourceExpression(PColumn column) {
     		return sourceExpressions.get(column.getPosition() - (table.getBucketNum() == null ? 0 : 1));
     	}
+        
+        public TupleProjector createTupleProjector() {
+            return new TupleProjector(this);
+        }
     }
     
     public static class JoinedTableColumnResolver implements ColumnResolver {

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/16e74c68/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 729459d..c1ea4ca 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -43,7 +43,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.join.HashJoinInfo;
-import org.apache.phoenix.join.ScanProjector;
+import org.apache.phoenix.join.TupleProjector;
 import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.parse.JoinTableNode.JoinType;
 import org.apache.phoenix.parse.ParseNode;
@@ -136,18 +136,41 @@ public class QueryCompiler {
         List<JoinSpec> joinSpecs = joinTable.getJoinSpecs();
         if (joinSpecs.isEmpty()) {
             Table table = joinTable.getTable();
-            ProjectedPTableWrapper projectedTable = table.createProjectedTable(!asSubquery);
-            ScanProjector.serializeProjectorIntoScan(context.getScan(), JoinCompiler.getScanProjector(projectedTable));
-            context.setCurrentTable(table.getTableRef());
+            SelectStatement subquery = table.getAsSubquery();
+            if (!table.isSubselect()) {
+                ProjectedPTableWrapper projectedTable = table.createProjectedTable(!asSubquery);
+                TupleProjector.serializeProjectorIntoScan(context.getScan(), projectedTable.createTupleProjector());
+                context.setCurrentTable(table.getTableRef());
+                context.setResolver(projectedTable.createColumnResolver());
+                table.projectColumns(context.getScan());
+                return compileSingleQuery(context, subquery, binds, null);
+            }
+            QueryPlan plan = compileSubquery(subquery);
+            ProjectedPTableWrapper projectedTable = table.createProjectedTable(plan.getProjector());
             context.setResolver(projectedTable.createColumnResolver());
-            table.projectColumns(context.getScan());
-            return compileSingleQuery(context, table.getAsSubquery(), binds, null);
+            context.setClientTupleProjector(projectedTable.createTupleProjector());
+            return plan;
         }
         
         boolean[] starJoinVector = joinTable.getStarJoinVector();
         if (starJoinVector != null) {
             Table table = joinTable.getTable();
-            ProjectedPTableWrapper initialProjectedTable = table.createProjectedTable(!asSubquery);
+            ProjectedPTableWrapper initialProjectedTable;
+            TableRef tableRef;
+            SelectStatement query;
+            if (!table.isSubselect()) {
+                initialProjectedTable = table.createProjectedTable(!asSubquery);
+                tableRef = table.getTableRef();
+                table.projectColumns(context.getScan());
+                query = joinTable.getAsSingleSubquery(table.getAsSubquery(), asSubquery);
+            } else {
+                SelectStatement subquery = table.getAsSubquery();
+                QueryPlan plan = compileSubquery(subquery);
+                initialProjectedTable = table.createProjectedTable(plan.getProjector());
+                tableRef = plan.getTableRef();
+                context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap());
+                query = joinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery);
+            }
             PTableWrapper projectedTable = initialProjectedTable;
             int count = joinSpecs.size();
             ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[count];
@@ -157,14 +180,17 @@ public class QueryCompiler {
             PTable[] tables = new PTable[count];
             int[] fieldPositions = new int[count];
             QueryPlan[] joinPlans = new QueryPlan[count];
+            TupleProjector[] clientProjectors = new TupleProjector[count];
             fieldPositions[0] = projectedTable.getTable().getColumns().size() - projectedTable.getTable().getPKColumns().size();
-            boolean needsProject = asSubquery;
+            boolean forceProjection = table.isSubselect();
+            boolean needsProject = forceProjection || asSubquery;
             for (int i = 0; i < count; i++) {
                 JoinSpec joinSpec = joinSpecs.get(i);
                 Scan subScan = ScanUtil.newScan(originalScan);
                 StatementContext subContext = new StatementContext(statement, context.getResolver(), subScan);
                 joinPlans[i] = compileJoinQuery(subContext, binds, joinSpec.getJoinTable(), true);
                 ColumnResolver resolver = subContext.getResolver();
+                clientProjectors[i] = subContext.getClientTupleProjector();
                 boolean hasPostReference = joinSpec.getJoinTable().hasPostReference();
                 if (hasPostReference) {
                     PTableWrapper subProjTable = ((JoinedTableColumnResolver) (resolver)).getPTableWrapper();
@@ -177,7 +203,7 @@ public class QueryCompiler {
                 if (!starJoinVector[i]) {
                     needsProject = true;
                 }
-                ColumnResolver leftResolver = starJoinVector[i] ? joinTable.getOriginalResolver() : projectedTable.createColumnResolver();
+                ColumnResolver leftResolver = (!forceProjection && starJoinVector[i]) ? joinTable.getOriginalResolver() : projectedTable.createColumnResolver();
                 joinIds[i] = new ImmutableBytesPtr(emptyByteArray); // place-holder
                 Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, leftResolver, resolver);
                 joinExpressions[i] = joinConditions.getFirst();
@@ -188,15 +214,14 @@ public class QueryCompiler {
                 }
             }
             if (needsProject) {
-                ScanProjector.serializeProjectorIntoScan(context.getScan(), JoinCompiler.getScanProjector(initialProjectedTable));
+                TupleProjector.serializeProjectorIntoScan(context.getScan(), initialProjectedTable.createTupleProjector());
             }
-            context.setCurrentTable(table.getTableRef());
+            context.setCurrentTable(tableRef);
             context.setResolver(needsProject ? projectedTable.createColumnResolver() : joinTable.getOriginalResolver());
-            table.projectColumns(context.getScan());
-            BasicQueryPlan plan = compileSingleQuery(context, joinTable.getSubqueryWithoutJoin(asSubquery), binds, parallelIteratorFactory);
+            BasicQueryPlan plan = compileSingleQuery(context, query, binds, parallelIteratorFactory);
             Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context);
-            HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, joinExpressions, joinTypes, starJoinVector, tables, fieldPositions, postJoinFilterExpression);
-            return new HashJoinPlan(plan, joinInfo, hashExpressions, joinPlans);
+            HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, joinExpressions, joinTypes, starJoinVector, tables, fieldPositions, postJoinFilterExpression, forceProjection);
+            return new HashJoinPlan(joinTable.getStatement(), plan, joinInfo, hashExpressions, joinPlans, clientProjectors);
         }
         
         JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1);
@@ -210,35 +235,56 @@ public class QueryCompiler {
             
             JoinTable rhsJoinTable = lastJoinSpec.getJoinTable();
             Table rhsTable = rhsJoinTable.getTable();
-            SelectStatement rhs = rhsJoinTable.getSubqueryWithoutJoin(asSubquery);
             JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();
             Scan subScan = ScanUtil.newScan(originalScan);
             StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), subScan);
             QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true);
             ColumnResolver lhsResolver = lhsCtx.getResolver();
+            TupleProjector clientProjector = lhsCtx.getClientTupleProjector();
             PTableWrapper lhsProjTable = ((JoinedTableColumnResolver) (lhsResolver)).getPTableWrapper();
-            ProjectedPTableWrapper rhsProjTable = rhsTable.createProjectedTable(!asSubquery);
-            ColumnResolver rhsResolver = joinTable.getOriginalResolver();
+            ProjectedPTableWrapper rhsProjTable;
+            TableRef rhsTableRef;
+            SelectStatement rhs;
+            if (!rhsTable.isSubselect()) {
+                rhsProjTable = rhsTable.createProjectedTable(!asSubquery);
+                rhsTableRef = rhsTable.getTableRef();
+                rhsTable.projectColumns(context.getScan());
+                rhs = rhsJoinTable.getAsSingleSubquery(rhsTable.getAsSubquery(), asSubquery);
+            } else {
+                SelectStatement subquery = rhsTable.getAsSubquery();
+                QueryPlan plan = compileSubquery(subquery);
+                rhsProjTable = rhsTable.createProjectedTable(plan.getProjector());
+                rhsTableRef = plan.getTableRef();
+                context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap());
+                rhs = rhsJoinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery);
+            }
+            boolean forceProjection = rhsTable.isSubselect();
+            ColumnResolver rhsResolver = forceProjection ? rhsProjTable.createColumnResolver() : joinTable.getOriginalResolver();
             ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[] {new ImmutableBytesPtr(emptyByteArray)};
             Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(context, lhsResolver, rhsResolver);
             List<Expression> joinExpressions = joinConditions.getSecond();
             List<Expression> hashExpressions = joinConditions.getFirst();
             int fieldPosition = rhsProjTable.getTable().getColumns().size() - rhsProjTable.getTable().getPKColumns().size();
             PTableWrapper projectedTable = rhsProjTable.mergeProjectedTables(lhsProjTable, type == JoinType.Inner);
-            ScanProjector.serializeProjectorIntoScan(context.getScan(), JoinCompiler.getScanProjector(rhsProjTable));
-            context.setCurrentTable(rhsTable.getTableRef());
+            TupleProjector.serializeProjectorIntoScan(context.getScan(), rhsProjTable.createTupleProjector());
+            context.setCurrentTable(rhsTableRef);
             context.setResolver(projectedTable.createColumnResolver());
-            rhsTable.projectColumns(context.getScan());
             BasicQueryPlan rhsPlan = compileSingleQuery(context, rhs, binds, parallelIteratorFactory);
             Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context);
-            HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, new List[] {joinExpressions}, new JoinType[] {type == JoinType.Inner ? type : JoinType.Left}, new boolean[] {true}, new PTable[] {lhsProjTable.getTable()}, new int[] {fieldPosition}, postJoinFilterExpression);
-            return new HashJoinPlan(rhsPlan, joinInfo, new List[] {hashExpressions}, new QueryPlan[] {lhsPlan});
+            HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, new List[] {joinExpressions}, new JoinType[] {type == JoinType.Inner ? type : JoinType.Left}, new boolean[] {true}, new PTable[] {lhsProjTable.getTable()}, new int[] {fieldPosition}, postJoinFilterExpression, forceProjection);
+            return new HashJoinPlan(joinTable.getStatement(), rhsPlan, joinInfo, new List[] {hashExpressions}, new QueryPlan[] {lhsPlan}, new TupleProjector[] {clientProjector});
         }
         
         // Do not support queries like "A right join B left join C" with hash-joins.
         throw new SQLFeatureNotSupportedException("Joins with pattern 'A right join B left join C' not supported.");
     }
     
+    protected QueryPlan compileSubquery(SelectStatement subquery) throws SQLException {
+        ColumnResolver resolver = FromCompiler.getResolverForQuery(subquery, this.statement.getConnection());
+        QueryPlan plan = new QueryCompiler(this.statement, subquery, resolver).compile();
+        return statement.getConnection().getQueryServices().getOptimizer().optimize(statement, plan);
+    }
+    
     protected BasicQueryPlan compileSingleQuery(StatementContext context, SelectStatement select, List<Object> binds, ParallelIteratorFactory parallelIteratorFactory) throws SQLException{
         PhoenixConnection connection = statement.getConnection();
         ColumnResolver resolver = context.getResolver();

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/16e74c68/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
index 168f392..27f9e82 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.join.TupleProjector;
 import org.apache.phoenix.query.*;
 import org.apache.phoenix.schema.*;
 import org.apache.phoenix.util.*;
@@ -60,6 +61,7 @@ public class StatementContext {
 
     private TableRef currentTable;
     private List<Pair<byte[], byte[]>> whereConditionColumns;
+    private TupleProjector clientTupleProjector;
     
     public StatementContext(PhoenixStatement statement) {
         this(statement, FromCompiler.EMPTY_TABLE_RESOLVER, new Scan());
@@ -226,4 +228,12 @@ public class StatementContext {
     public List<Pair<byte[], byte[]>> getWhereCoditionColumns() {
         return whereConditionColumns;
     }
+    
+    public TupleProjector getClientTupleProjector() {
+        return clientTupleProjector;
+    }
+    
+    public void setClientTupleProjector(TupleProjector projector) {
+        this.clientTupleProjector = projector;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/16e74c68/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
index 0435301..855ed75 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/StatementNormalizer.java
@@ -18,7 +18,6 @@
 package org.apache.phoenix.compile;
 
 import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
 import java.util.Collections;
 import java.util.List;
 
@@ -137,7 +136,8 @@ public class StatementNormalizer extends ParseNodeRewriter {
         @Override
         public List<TableName> visit(DerivedTableNode subselectNode)
                 throws SQLException {
-            throw new SQLFeatureNotSupportedException();
+            TableName name = TableName.create(null, subselectNode.getAlias());
+            return Collections.singletonList(name);
         }
     };
     

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/16e74c68/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index ae3c1c4..0d7063b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -59,7 +59,7 @@ import org.apache.phoenix.expression.aggregator.Aggregator;
 import org.apache.phoenix.expression.aggregator.ServerAggregators;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.join.HashJoinInfo;
-import org.apache.phoenix.join.ScanProjector;
+import org.apache.phoenix.join.TupleProjector;
 import org.apache.phoenix.memory.GlobalMemoryManager;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.query.QueryConstants;
@@ -113,7 +113,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver {
                         .getAttribute(BaseScannerRegionObserver.AGGREGATORS), c
                         .getEnvironment().getConfiguration());
 
-        final ScanProjector p = ScanProjector.deserializeProjectorFromScan(scan);
+        final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
         final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
         long limit = Long.MAX_VALUE;
         byte[] limitBytes = scan.getAttribute(GROUP_BY_LIMIT);

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/16e74c68/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index 8c9eeb2..0be219f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -37,8 +37,8 @@ import org.apache.phoenix.cache.TenantCache;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.join.HashJoinInfo;
-import org.apache.phoenix.join.ScanProjector;
-import org.apache.phoenix.join.ScanProjector.ProjectedValueTuple;
+import org.apache.phoenix.join.TupleProjector;
+import org.apache.phoenix.join.TupleProjector.ProjectedValueTuple;
 import org.apache.phoenix.parse.JoinTableNode.JoinType;
 import org.apache.phoenix.schema.IllegalDataException;
 import org.apache.phoenix.schema.KeyValueSchema;
@@ -50,7 +50,7 @@ import org.apache.phoenix.util.TupleUtil;
 public class HashJoinRegionScanner implements RegionScanner {
     
     private final RegionScanner scanner;
-    private final ScanProjector projector;
+    private final TupleProjector projector;
     private final HashJoinInfo joinInfo;
     private Queue<Tuple> resultQueue;
     private boolean hasMore;
@@ -60,7 +60,7 @@ public class HashJoinRegionScanner implements RegionScanner {
     private ValueBitSet[] tempSrcBitSet;
     
     @SuppressWarnings("unchecked")
-    public HashJoinRegionScanner(RegionScanner scanner, ScanProjector projector, HashJoinInfo joinInfo, ImmutableBytesWritable tenantId, RegionCoprocessorEnvironment env) throws IOException {
+    public HashJoinRegionScanner(RegionScanner scanner, TupleProjector projector, HashJoinInfo joinInfo, ImmutableBytesWritable tenantId, RegionCoprocessorEnvironment env) throws IOException {
         this.scanner = scanner;
         this.projector = projector;
         this.joinInfo = joinInfo;
@@ -98,8 +98,11 @@ public class HashJoinRegionScanner implements RegionScanner {
             return;
         
         Tuple tuple = new ResultTuple(Result.create(result));
+        if (joinInfo == null || joinInfo.forceProjection()) {
+            tuple = projector.projectResults(tuple);
+        }
         if (joinInfo == null) {
-            resultQueue.offer(projector.projectResults(tuple));
+            resultQueue.offer(tuple);
             return;
         }
         
@@ -130,7 +133,10 @@ public class HashJoinRegionScanner implements RegionScanner {
                 }
             } else {
                 KeyValueSchema schema = joinInfo.getJoinedSchema();
-                resultQueue.offer(projector.projectResults(tuple));
+                if (!joinInfo.forceProjection()) {
+                    tuple = projector.projectResults(tuple);
+                }
+                resultQueue.offer(tuple);
                 for (int i = 0; i < count; i++) {
                     boolean earlyEvaluation = joinInfo.earlyEvaluation()[i];
                     if (earlyEvaluation && tempTuples[i] == null)
@@ -150,7 +156,7 @@ public class HashJoinRegionScanner implements RegionScanner {
                         }
                         for (Tuple t : tempTuples[i]) {
                             Tuple joined = tempSrcBitSet[i] == ValueBitSet.EMPTY_VALUE_BITSET ?
-                                    lhs : ScanProjector.mergeProjectedValue(
+                                    lhs : TupleProjector.mergeProjectedValue(
                                             (ProjectedValueTuple) lhs, schema, tempDestBitSet,
                                             t, joinInfo.getSchemas()[i], tempSrcBitSet[i], 
                                             joinInfo.getFieldPositions()[i]);

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/16e74c68/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index 84d6b0e..51d9033 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -48,7 +48,7 @@ import org.apache.phoenix.iterate.OrderedResultIterator;
 import org.apache.phoenix.iterate.RegionScannerResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.join.HashJoinInfo;
-import org.apache.phoenix.join.ScanProjector;
+import org.apache.phoenix.join.TupleProjector;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.KeyValueSchema;
@@ -177,7 +177,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver {
             return s;
         }
         
-        final ScanProjector p = ScanProjector.deserializeProjectorFromScan(scan);
+        final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
         final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
         final ImmutableBytesWritable tenantId = ScanUtil.getTenantId(scan);
         

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/16e74c68/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 0139400..995889d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -60,7 +60,7 @@ import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.join.HashJoinInfo;
-import org.apache.phoenix.join.ScanProjector;
+import org.apache.phoenix.join.TupleProjector;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.ConstraintViolationException;
@@ -131,7 +131,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             return s;
         }
         
-        final ScanProjector p = ScanProjector.deserializeProjectorFromScan(scan);
+        final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan);
         final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
         RegionScanner theScanner = s;
         if (p != null || j != null)  {

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/16e74c68/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 68e45ad..abd4475 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -43,6 +43,7 @@ import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.job.JobManager.JobCallable;
 import org.apache.phoenix.join.HashCacheClient;
 import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.join.TupleProjector;
 import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.KeyRange;
@@ -57,17 +58,23 @@ import com.google.common.collect.Lists;
 public class HashJoinPlan implements QueryPlan {
     private static final Log LOG = LogFactory.getLog(HashJoinPlan.class);
     
-    private BasicQueryPlan plan;
-    private HashJoinInfo joinInfo;
-    private List<Expression>[] hashExpressions;
-    private QueryPlan[] hashPlans;
+    private final FilterableStatement statement;
+    private final BasicQueryPlan plan;
+    private final HashJoinInfo joinInfo;
+    private final List<Expression>[] hashExpressions;
+    private final QueryPlan[] hashPlans;
+    private final TupleProjector[] clientProjectors;
     
-    public HashJoinPlan(BasicQueryPlan plan, HashJoinInfo joinInfo,
-            List<Expression>[] hashExpressions, QueryPlan[] hashPlans) {
+    public HashJoinPlan(FilterableStatement statement, 
+            BasicQueryPlan plan, HashJoinInfo joinInfo,
+            List<Expression>[] hashExpressions, QueryPlan[] hashPlans, 
+            TupleProjector[] clientProjectors) {
+        this.statement = statement;
         this.plan = plan;
         this.joinInfo = joinInfo;
         this.hashExpressions = hashExpressions;
         this.hashPlans = hashPlans;
+        this.clientProjectors = clientProjectors;
     }
 
     @Override
@@ -110,7 +117,7 @@ public class HashJoinPlan implements QueryPlan {
                 public ServerCache call() throws Exception {
                     QueryPlan hashPlan = hashPlans[index];
                     ServerCache cache = hashClient.addHashCache(ranges, hashPlan.iterator(), 
-                            hashPlan.getEstimatedSize(), hashExpressions[index], plan.getTableRef());
+                            clientProjectors[index], hashPlan.getEstimatedSize(), hashExpressions[index], plan.getTableRef());
                     long endTime = System.currentTimeMillis();
                     boolean isSet = firstJobEndTime.compareAndSet(0, endTime);
                     if (!isSet && (endTime - firstJobEndTime.get()) > maxServerCacheTimeToLive) {
@@ -205,7 +212,7 @@ public class HashJoinPlan implements QueryPlan {
 
     @Override
     public FilterableStatement getStatement() {
-        return plan.getStatement();
+        return statement;
     }
 
     @Override