You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2014/11/14 22:10:34 UTC

[1/4] phoenix git commit: PHOENIX-1799 Support many-to-many joins

Repository: phoenix
Updated Branches:
  refs/heads/4.0 4a675d84d -> ebc7ee42c


http://git-wip-us.apache.org/repos/asf/phoenix/blob/ebc7ee42/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
new file mode 100644
index 0000000..03eda06
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -0,0 +1,628 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.io.IOException;
+import java.nio.MappedByteBuffer;
+import java.sql.ParameterMetaData;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
+import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.compile.RowProjector;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.iterate.MappedByteBufferQueue;
+import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
+import org.apache.phoenix.parse.FilterableStatement;
+import org.apache.phoenix.parse.JoinTableNode.JoinType;
+import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.KeyValueSchema;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.ValueBitSet;
+import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
+import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ResultUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+import com.google.common.collect.Lists;
+
+public class SortMergeJoinPlan implements QueryPlan {
+    private static final byte[] EMPTY_PTR = new byte[0];
+    
+    private final StatementContext context;
+    private final FilterableStatement statement;
+    private final TableRef table;
+    private final JoinType type;
+    private final QueryPlan lhsPlan;
+    private final QueryPlan rhsPlan;
+    private final List<Expression> lhsKeyExpressions;
+    private final List<Expression> rhsKeyExpressions;
+    private final KeyValueSchema joinedSchema;
+    private final KeyValueSchema lhsSchema;
+    private final KeyValueSchema rhsSchema;
+    private final int rhsFieldPosition;
+
+    public SortMergeJoinPlan(StatementContext context, FilterableStatement statement, TableRef table, 
+            JoinType type, QueryPlan lhsPlan, QueryPlan rhsPlan, List<Expression> lhsKeyExpressions, List<Expression> rhsKeyExpressions,
+            PTable joinedTable, PTable lhsTable, PTable rhsTable, int rhsFieldPosition) {
+        if (type == JoinType.Right) throw new IllegalArgumentException("JoinType should not be " + type);
+        this.context = context;
+        this.statement = statement;
+        this.table = table;
+        this.type = type;
+        this.lhsPlan = lhsPlan;
+        this.rhsPlan = rhsPlan;
+        this.lhsKeyExpressions = lhsKeyExpressions;
+        this.rhsKeyExpressions = rhsKeyExpressions;
+        this.joinedSchema = buildSchema(joinedTable);
+        this.lhsSchema = buildSchema(lhsTable);
+        this.rhsSchema = buildSchema(rhsTable);
+        this.rhsFieldPosition = rhsFieldPosition;
+    }
+
+    private static KeyValueSchema buildSchema(PTable table) {
+        KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
+        if (table != null) {
+            for (PColumn column : table.getColumns()) {
+                if (!SchemaUtil.isPKColumn(column)) {
+                    builder.addField(column);
+                }
+            }
+        }
+        return builder.build();
+    }
+
+    @Override
+    public ResultIterator iterator() throws SQLException {        
+        return type == JoinType.Semi || type == JoinType.Anti ? 
+                new SemiAntiJoinIterator(lhsPlan.iterator(), rhsPlan.iterator()) :
+                new BasicJoinIterator(lhsPlan.iterator(), rhsPlan.iterator());
+    }
+
+    @Override
+    public ExplainPlan getExplainPlan() throws SQLException {
+        List<String> steps = Lists.newArrayList();
+        steps.add("SORT-MERGE-JOIN (" + type.toString().toUpperCase() + ") TABLES");
+        for (String step : lhsPlan.getExplainPlan().getPlanSteps()) {
+            steps.add("    " + step);            
+        }
+        steps.add("AND" + (rhsSchema.getFieldCount() == 0 ? " (SKIP MERGE)" : ""));
+        for (String step : rhsPlan.getExplainPlan().getPlanSteps()) {
+            steps.add("    " + step);            
+        }
+        return new ExplainPlan(steps);
+    }
+
+    @Override
+    public StatementContext getContext() {
+        return context;
+    }
+
+    @Override
+    public ParameterMetaData getParameterMetaData() {
+        return PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA;
+    }
+
+    @Override
+    public long getEstimatedSize() {
+        return lhsPlan.getEstimatedSize() + rhsPlan.getEstimatedSize();
+    }
+
+    @Override
+    public TableRef getTableRef() {
+        return table;
+    }
+
+    @Override
+    public RowProjector getProjector() {
+        return null;
+    }
+
+    @Override
+    public Integer getLimit() {
+        return null;
+    }
+
+    @Override
+    public OrderBy getOrderBy() {
+        return null;
+    }
+
+    @Override
+    public GroupBy getGroupBy() {
+        return null;
+    }
+
+    @Override
+    public List<KeyRange> getSplits() {
+        return Collections.<KeyRange> emptyList();
+    }
+
+    @Override
+    public List<List<Scan>> getScans() {
+        return Collections.<List<Scan>> emptyList();
+    }
+
+    @Override
+    public FilterableStatement getStatement() {
+        return statement;
+    }
+
+    @Override
+    public boolean isDegenerate() {
+        return false;
+    }
+
+    @Override
+    public boolean isRowKeyOrdered() {
+        return false;
+    }
+    
+    private class BasicJoinIterator implements ResultIterator {
+        private final ResultIterator lhsIterator;
+        private final ResultIterator rhsIterator;
+        private boolean initialized;
+        private Tuple lhsTuple;
+        private Tuple rhsTuple;
+        private JoinKey lhsKey;
+        private JoinKey rhsKey;
+        private Tuple nextLhsTuple;
+        private Tuple nextRhsTuple;
+        private JoinKey nextLhsKey;
+        private JoinKey nextRhsKey;
+        private ValueBitSet destBitSet;
+        private ValueBitSet lhsBitSet;
+        private ValueBitSet rhsBitSet;
+        private byte[] emptyProjectedValue;
+        private MappedByteBufferTupleQueue queue;
+        private Iterator<Tuple> queueIterator;
+        
+        public BasicJoinIterator(ResultIterator lhsIterator, ResultIterator rhsIterator) {
+            this.lhsIterator = lhsIterator;
+            this.rhsIterator = rhsIterator;
+            this.initialized = false;
+            this.lhsTuple = null;
+            this.rhsTuple = null;
+            this.lhsKey = new JoinKey(lhsKeyExpressions);
+            this.rhsKey = new JoinKey(rhsKeyExpressions);
+            this.nextLhsTuple = null;
+            this.nextRhsTuple = null;
+            this.nextLhsKey = new JoinKey(lhsKeyExpressions);
+            this.nextRhsKey = new JoinKey(rhsKeyExpressions);
+            this.destBitSet = ValueBitSet.newInstance(joinedSchema);
+            this.lhsBitSet = ValueBitSet.newInstance(lhsSchema);
+            this.rhsBitSet = ValueBitSet.newInstance(rhsSchema);
+            lhsBitSet.clear();
+            int len = lhsBitSet.getEstimatedLength();
+            this.emptyProjectedValue = new byte[len];
+            lhsBitSet.toBytes(emptyProjectedValue, 0);
+            int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt(
+                    QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
+            this.queue = new MappedByteBufferTupleQueue(thresholdBytes);
+            this.queueIterator = null;
+        }
+        
+        @Override
+        public void close() throws SQLException {
+            lhsIterator.close();
+            rhsIterator.close();
+            queue.close();
+        }
+
+        @Override
+        public Tuple next() throws SQLException {
+            if (!initialized) {
+                init();
+            }
+
+            Tuple next = null;
+            while (next == null && !isEnd()) {
+                if (queueIterator != null) {
+                    if (queueIterator.hasNext()) {
+                        next = join(lhsTuple, queueIterator.next());
+                    } else {
+                        boolean eq = nextLhsTuple != null && lhsKey.equals(nextLhsKey);
+                        advance(true);
+                        if (eq) {
+                            queueIterator = queue.iterator();
+                        } else {
+                            queue.clear();
+                            queueIterator = null;
+                        }
+                    }
+                } else if (lhsTuple != null) {
+                    if (rhsTuple != null) {
+                        if (lhsKey.equals(rhsKey)) {
+                            next = join(lhsTuple, rhsTuple);
+                            if (nextLhsTuple != null && lhsKey.equals(nextLhsKey)) {
+                                queue.offer(rhsTuple);
+                                if (nextRhsTuple == null || !rhsKey.equals(nextRhsKey)) {
+                                    queueIterator = queue.iterator();
+                                    advance(true);
+                                }
+                            } else if (nextRhsTuple == null || !rhsKey.equals(nextRhsKey)) {
+                                advance(true);
+                            }
+                            advance(false);
+                        } else if (lhsKey.compareTo(rhsKey) < 0) {
+                            if (type == JoinType.Full || type == JoinType.Left) {
+                                next = join(lhsTuple, null);
+                            }
+                            advance(true);
+                        } else {
+                            if (type == JoinType.Full) {
+                                next = join(null, rhsTuple);
+                            }
+                            advance(false);
+                        }
+                    } else { // left-join or full-join
+                        next = join(lhsTuple, null);
+                        advance(true);
+                    }
+                } else { // full-join
+                    next = join(null, rhsTuple);
+                    advance(false);
+                }
+            }
+
+            return next;
+        }
+
+        @Override
+        public void explain(List<String> planSteps) {
+        }
+        
+        private void init() throws SQLException {
+            nextLhsTuple = lhsIterator.next();
+            if (nextLhsTuple != null) {
+                nextLhsKey.evaluate(nextLhsTuple);
+            }
+            advance(true);
+            nextRhsTuple = rhsIterator.next();
+            if (nextRhsTuple != null) {
+                nextRhsKey.evaluate(nextRhsTuple);
+            }
+            advance(false);
+            initialized = true;
+        }
+        
+        private void advance(boolean lhs) throws SQLException {
+            if (lhs) {
+                lhsTuple = nextLhsTuple;
+                lhsKey.set(nextLhsKey);
+                if (lhsTuple != null) {
+                    nextLhsTuple = lhsIterator.next();
+                    if (nextLhsTuple != null) {
+                        nextLhsKey.evaluate(nextLhsTuple);
+                    } else {
+                        nextLhsKey.clear();
+                    }
+                }
+            } else {
+                rhsTuple = nextRhsTuple;
+                rhsKey.set(nextRhsKey);
+                if (rhsTuple != null) {
+                    nextRhsTuple = rhsIterator.next();
+                    if (nextRhsTuple != null) {
+                        nextRhsKey.evaluate(nextRhsTuple);
+                    } else {
+                        nextRhsKey.clear();
+                    }
+                }                    
+            }
+        }
+        
+        private boolean isEnd() {
+            return (lhsTuple == null && (rhsTuple == null || type != JoinType.Full))
+                    || (queueIterator == null && rhsTuple == null && type == JoinType.Inner);
+        }        
+        
+        private Tuple join(Tuple lhs, Tuple rhs) throws SQLException {
+            try {
+                ProjectedValueTuple t = null;
+                if (lhs == null) {
+                    t = new ProjectedValueTuple(rhs, rhs.getValue(0).getTimestamp(), 
+                            this.emptyProjectedValue, 0, this.emptyProjectedValue.length, 
+                            this.emptyProjectedValue.length);
+                } else if (lhs instanceof ProjectedValueTuple) {
+                    t = (ProjectedValueTuple) lhs;
+                } else {
+                    ImmutableBytesWritable ptr = context.getTempPtr();
+                    TupleProjector.decodeProjectedValue(lhs, ptr);
+                    lhsBitSet.clear();
+                    lhsBitSet.or(ptr);
+                    int bitSetLen = lhsBitSet.getEstimatedLength();
+                    t = new ProjectedValueTuple(lhs, lhs.getValue(0).getTimestamp(), 
+                            ptr.get(), ptr.getOffset(), ptr.getLength(), bitSetLen);
+                    
+                }
+                return rhsBitSet == ValueBitSet.EMPTY_VALUE_BITSET ?
+                        t : TupleProjector.mergeProjectedValue(
+                                t, joinedSchema, destBitSet,
+                                rhs, rhsSchema, rhsBitSet, rhsFieldPosition);
+            } catch (IOException e) {
+                throw new SQLException(e);
+            }
+        }
+    }
+    
+    private class SemiAntiJoinIterator implements ResultIterator {
+        private final ResultIterator lhsIterator;
+        private final ResultIterator rhsIterator;
+        private final boolean isSemi;
+        private boolean initialized;
+        private Tuple lhsTuple;
+        private Tuple rhsTuple;
+        private JoinKey lhsKey;
+        private JoinKey rhsKey;
+        
+        public SemiAntiJoinIterator(ResultIterator lhsIterator, ResultIterator rhsIterator) {
+            if (type != JoinType.Semi && type != JoinType.Anti) throw new IllegalArgumentException("Type " + type + " is not allowed by " + SemiAntiJoinIterator.class.getName());
+            this.lhsIterator = lhsIterator;
+            this.rhsIterator = rhsIterator;
+            this.isSemi = type == JoinType.Semi;
+            this.initialized = false;
+            this.lhsTuple = null;
+            this.rhsTuple = null;
+            this.lhsKey = new JoinKey(lhsKeyExpressions);
+            this.rhsKey = new JoinKey(rhsKeyExpressions);
+        }
+
+        @Override
+        public void close() throws SQLException {
+            lhsIterator.close();
+            rhsIterator.close();
+        }
+
+        @Override
+        public Tuple next() throws SQLException {
+            if (!initialized) {
+                advance(true);
+                advance(false);
+                initialized = true;
+            }
+            
+            Tuple next = null;            
+            while (lhsTuple != null && next == null) {
+                if (rhsTuple != null) {
+                    if (lhsKey.equals(rhsKey)) {
+                        if (isSemi) {
+                            next = lhsTuple;
+                        }
+                        advance(true);
+                    } else if (lhsKey.compareTo(rhsKey) < 0) {
+                        if (!isSemi) {
+                            next = lhsTuple;
+                        }
+                        advance(true);
+                    } else {
+                        advance(false);
+                    }
+                } else {
+                    if (!isSemi) {
+                        next = lhsTuple;
+                    }
+                    advance(true);
+                }
+            }
+            
+            return next;
+        }
+
+        @Override
+        public void explain(List<String> planSteps) {
+        }
+        
+        private void advance(boolean lhs) throws SQLException {
+            if (lhs) {
+                lhsTuple = lhsIterator.next();
+                if (lhsTuple != null) {
+                    lhsKey.evaluate(lhsTuple);
+                } else {
+                    lhsKey.clear();
+                }
+            } else {
+                rhsTuple = rhsIterator.next();
+                if (rhsTuple != null) {
+                    rhsKey.evaluate(rhsTuple);
+                } else {
+                    rhsKey.clear();
+                }
+            }
+        }
+    }
+    
+    private static class JoinKey implements Comparable<JoinKey> {
+        private final List<Expression> expressions;
+        private final List<ImmutableBytesWritable> keys;
+        
+        public JoinKey(List<Expression> expressions) {
+            this.expressions = expressions;
+            this.keys = Lists.newArrayListWithExpectedSize(expressions.size());
+            for (int i = 0; i < expressions.size(); i++) {
+                this.keys.add(new ImmutableBytesWritable());
+            }
+        }
+        
+        public void evaluate(Tuple tuple) {
+            for (int i = 0; i < keys.size(); i++) {
+                if (!expressions.get(i).evaluate(tuple, keys.get(i))) {
+                    keys.get(i).set(EMPTY_PTR);
+                }
+            }
+        }
+        
+        public void set(JoinKey other) {
+            for (int i = 0; i < keys.size(); i++) {
+                ImmutableBytesWritable key = other.keys.get(i);
+                this.keys.get(i).set(key.get(), key.getOffset(), key.getLength());
+            }            
+        }
+        
+        public void clear() {
+            for (int i = 0; i < keys.size(); i++) {
+                this.keys.get(i).set(EMPTY_PTR);
+            }            
+        }
+        
+        @Override
+        public boolean equals(Object other) {
+            if (!(other instanceof JoinKey)) 
+                return false;
+            return this.compareTo((JoinKey) other) == 0;
+        }
+        
+        @Override
+        public int compareTo(JoinKey other) {
+            for (int i = 0; i < keys.size(); i++) {
+                int comp = this.keys.get(i).compareTo(other.keys.get(i));
+                if (comp != 0) 
+                    return comp; 
+            }
+            
+            return 0;
+        }
+    }
+    
+    private static class MappedByteBufferTupleQueue extends MappedByteBufferQueue<Tuple> {
+
+        public MappedByteBufferTupleQueue(int thresholdBytes) {
+            super(thresholdBytes);
+        }
+
+        @Override
+        protected MappedByteBufferSegmentQueue<Tuple> createSegmentQueue(
+                int index, int thresholdBytes) {
+            return new MappedByteBufferTupleSegmentQueue(index, thresholdBytes, false);
+        }
+
+        @Override
+        protected Comparator<MappedByteBufferSegmentQueue<Tuple>> getSegmentQueueComparator() {
+            return new Comparator<MappedByteBufferSegmentQueue<Tuple>>() {
+                @Override
+                public int compare(MappedByteBufferSegmentQueue<Tuple> q1, 
+                        MappedByteBufferSegmentQueue<Tuple> q2) {
+                    return q1.index() - q2.index();
+                }                
+            };
+        }
+
+        @Override
+        public Iterator<Tuple> iterator() {
+            return new Iterator<Tuple>() {
+                private Iterator<MappedByteBufferSegmentQueue<Tuple>> queueIter;
+                private Iterator<Tuple> currentIter;
+                {
+                    this.queueIter = getSegmentQueues().iterator();
+                    this.currentIter = queueIter.hasNext() ? queueIter.next().iterator() : null;
+                }
+                
+                @Override
+                public boolean hasNext() {
+                    return currentIter != null && currentIter.hasNext();
+                }
+
+                @Override
+                public Tuple next() {
+                    if (!hasNext())
+                        return null;
+                    
+                    Tuple ret = currentIter.next();                    
+                    if (!currentIter.hasNext()) {
+                        this.currentIter = queueIter.hasNext() ? queueIter.next().iterator() : null;                       
+                    }
+                    
+                    return ret;
+                }
+
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+                
+            };
+        }
+        
+        private static class MappedByteBufferTupleSegmentQueue extends MappedByteBufferSegmentQueue<Tuple> {
+            private LinkedList<Tuple> results;
+            
+            public MappedByteBufferTupleSegmentQueue(int index,
+                    int thresholdBytes, boolean hasMaxQueueSize) {
+                super(index, thresholdBytes, hasMaxQueueSize);
+                this.results = Lists.newLinkedList();
+            }
+
+            @Override
+            protected Queue<Tuple> getInMemoryQueue() {
+                return results;
+            }
+
+            @Override
+            protected int sizeOf(Tuple e) {
+                KeyValue kv = KeyValueUtil.ensureKeyValue(e.getValue(0));
+                return Bytes.SIZEOF_INT * 2 + kv.getLength();
+            }
+
+            @SuppressWarnings("deprecation")
+            @Override
+            protected void writeToBuffer(MappedByteBuffer buffer, Tuple e) {
+                KeyValue kv = KeyValueUtil.ensureKeyValue(e.getValue(0));
+                buffer.putInt(kv.getLength() + Bytes.SIZEOF_INT);
+                buffer.putInt(kv.getLength());
+                buffer.put(kv.getBuffer(), kv.getOffset(), kv.getLength());
+            }
+
+            @Override
+            protected Tuple readFromBuffer(MappedByteBuffer buffer) {
+                int length = buffer.getInt();
+                if (length < 0)
+                    return null;
+                
+                byte[] b = new byte[length];
+                buffer.get(b);
+                Result result = ResultUtil.toResult(new ImmutableBytesWritable(b));
+                return new ResultTuple(result);
+            }
+            
+        }
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ebc7ee42/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
index 346a9fd..77682e4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java
@@ -152,14 +152,21 @@ public class TupleProjector {
     public static class ProjectedValueTuple extends BaseTuple {
         private ImmutableBytesWritable keyPtr = new ImmutableBytesWritable();
         private long timestamp;
-        private byte[] projectedValue;
+        private ImmutableBytesWritable projectedValue = new ImmutableBytesWritable();
         private int bitSetLen;
         private KeyValue keyValue;
 
-        private ProjectedValueTuple(byte[] keyBuffer, int keyOffset, int keyLength, long timestamp, byte[] projectedValue, int bitSetLen) {
+        public ProjectedValueTuple(Tuple keyBase, long timestamp, byte[] projectedValue, int valueOffset, int valueLength, int bitSetLen) {
+            keyBase.getKey(this.keyPtr);
+            this.timestamp = timestamp;
+            this.projectedValue.set(projectedValue, valueOffset, valueLength);
+            this.bitSetLen = bitSetLen;
+        }
+
+        public ProjectedValueTuple(byte[] keyBuffer, int keyOffset, int keyLength, long timestamp, byte[] projectedValue, int valueOffset, int valueLength, int bitSetLen) {
             this.keyPtr.set(keyBuffer, keyOffset, keyLength);
             this.timestamp = timestamp;
-            this.projectedValue = projectedValue;
+            this.projectedValue.set(projectedValue, valueOffset, valueLength);
             this.bitSetLen = bitSetLen;
         }
         
@@ -171,7 +178,7 @@ public class TupleProjector {
             return timestamp;
         }
         
-        public byte[] getProjectedValue() {
+        public ImmutableBytesWritable getProjectedValue() {
             return projectedValue;
         }
         
@@ -196,7 +203,7 @@ public class TupleProjector {
         public KeyValue getValue(byte[] family, byte[] qualifier) {
             if (keyValue == null) {
                 keyValue = KeyValueUtil.newKeyValue(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(), 
-                        VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, timestamp, projectedValue, 0, projectedValue.length);
+                        VALUE_COLUMN_FAMILY, VALUE_COLUMN_QUALIFIER, timestamp, projectedValue.get(), projectedValue.getOffset(), projectedValue.getLength());
             }
             return keyValue;
         }
@@ -204,7 +211,7 @@ public class TupleProjector {
         @Override
         public boolean getValue(byte[] family, byte[] qualifier,
                 ImmutableBytesWritable ptr) {
-            ptr.set(projectedValue);
+            ptr.set(projectedValue.get(), projectedValue.getOffset(), projectedValue.getLength());
             return true;
         }
 
@@ -222,7 +229,7 @@ public class TupleProjector {
     public ProjectedValueTuple projectResults(Tuple tuple) {
     	byte[] bytesValue = schema.toBytes(tuple, getExpressions(), valueSet, ptr);
     	Cell base = tuple.getValue(0);
-        return new ProjectedValueTuple(base.getRowArray(), base.getRowOffset(), base.getRowLength(), base.getTimestamp(), bytesValue, valueSet.getEstimatedLength());
+        return new ProjectedValueTuple(base.getRowArray(), base.getRowOffset(), base.getRowLength(), base.getTimestamp(), bytesValue, 0, bytesValue.length, valueSet.getEstimatedLength());
     }
     
     public static void decodeProjectedValue(Tuple tuple, ImmutableBytesWritable ptr) throws IOException {
@@ -233,27 +240,33 @@ public class TupleProjector {
     
     public static ProjectedValueTuple mergeProjectedValue(ProjectedValueTuple dest, KeyValueSchema destSchema, ValueBitSet destBitSet,
     		Tuple src, KeyValueSchema srcSchema, ValueBitSet srcBitSet, int offset) throws IOException {
-    	ImmutableBytesWritable destValue = new ImmutableBytesWritable(dest.getProjectedValue());
+    	ImmutableBytesWritable destValue = dest.getProjectedValue();
+        int origDestBitSetLen = dest.getBitSetLength();
     	destBitSet.clear();
-    	destBitSet.or(destValue);
-    	int origDestBitSetLen = dest.getBitSetLength();
-    	ImmutableBytesWritable srcValue = new ImmutableBytesWritable();
-    	decodeProjectedValue(src, srcValue);
-    	srcBitSet.clear();
-    	srcBitSet.or(srcValue);
-    	int origSrcBitSetLen = srcBitSet.getEstimatedLength();
-    	for (int i = 0; i < srcBitSet.getMaxSetBit(); i++) {
-    		if (srcBitSet.get(i)) {
-    			destBitSet.set(offset + i);
-    		}
+    	destBitSet.or(destValue, origDestBitSetLen);
+    	ImmutableBytesWritable srcValue = null;
+    	int srcValueLen = 0;
+    	if (src != null) {
+    	    srcValue = new ImmutableBytesWritable();
+    	    decodeProjectedValue(src, srcValue);
+    	    srcBitSet.clear();
+    	    srcBitSet.or(srcValue);
+    	    int origSrcBitSetLen = srcBitSet.getEstimatedLength();
+    	    for (int i = 0; i <= srcBitSet.getMaxSetBit(); i++) {
+    	        if (srcBitSet.get(i)) {
+    	            destBitSet.set(offset + i);
+    	        }
+    	    }
+    	    srcValueLen = srcValue.getLength() - origSrcBitSetLen;
     	}
     	int destBitSetLen = destBitSet.getEstimatedLength();
-    	byte[] merged = new byte[destValue.getLength() - origDestBitSetLen + srcValue.getLength() - origSrcBitSetLen + destBitSetLen];
+    	byte[] merged = new byte[destValue.getLength() - origDestBitSetLen + srcValueLen + destBitSetLen];
     	int o = Bytes.putBytes(merged, 0, destValue.get(), destValue.getOffset(), destValue.getLength() - origDestBitSetLen);
-    	o = Bytes.putBytes(merged, o, srcValue.get(), srcValue.getOffset(), srcValue.getLength() - origSrcBitSetLen);
+    	if (src != null) {
+    	    o = Bytes.putBytes(merged, o, srcValue.get(), srcValue.getOffset(), srcValueLen);
+    	}
     	destBitSet.toBytes(merged, o);
-    	ImmutableBytesWritable keyPtr = dest.getKeyPtr();
-        return new ProjectedValueTuple(keyPtr.get(), keyPtr.getOffset(), keyPtr.getLength(), dest.getTimestamp(), merged, destBitSetLen);
+        return new ProjectedValueTuple(dest, dest.getTimestamp(), merged, 0, merged.length, destBitSetLen);
     }
 
     public KeyValueSchema getSchema() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ebc7ee42/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterResultIterator.java
index 0c68a20..6c332fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterResultIterator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterResultIterator.java
@@ -59,7 +59,8 @@ public class FilterResultIterator  extends LookAheadResultIterator {
         Tuple next;
         do {
             next = delegate.next();
-        } while (next != null && expression.evaluate(next, ptr) && Boolean.FALSE.equals(expression.getDataType().toObject(ptr)));
+            expression.reset();
+        } while (next != null && (!expression.evaluate(next, ptr) || Boolean.FALSE.equals(expression.getDataType().toObject(ptr))));
         return next;
     }
     

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ebc7ee42/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
new file mode 100644
index 0000000..8ada952
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.util.AbstractQueue;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.UUID;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.MinMaxPriorityQueue;
+
+public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> {
+    private final int thresholdBytes;
+    private List<MappedByteBufferSegmentQueue<T>> queues;
+    private int currentIndex;
+    private MappedByteBufferSegmentQueue<T> currentQueue;
+    private MinMaxPriorityQueue<MappedByteBufferSegmentQueue<T>> mergedQueue;
+
+    public MappedByteBufferQueue(int thresholdBytes) {
+        this.thresholdBytes = thresholdBytes;
+        this.queues = Lists.<MappedByteBufferSegmentQueue<T>> newArrayList();
+        this.currentIndex = -1;
+        this.currentQueue = null;
+        this.mergedQueue = null;
+    }
+    
+    abstract protected MappedByteBufferSegmentQueue<T> createSegmentQueue(int index, int thresholdBytes);
+    
+    abstract protected Comparator<MappedByteBufferSegmentQueue<T>> getSegmentQueueComparator();
+    
+    protected final List<MappedByteBufferSegmentQueue<T>> getSegmentQueues() {
+        return queues.subList(0, currentIndex + 1);
+    }
+
+    @Override
+    public boolean offer(T e) {
+        boolean startNewQueue = this.currentQueue == null || this.currentQueue.isFlushed();
+        if (startNewQueue) {
+            currentIndex++;
+            if (currentIndex < queues.size()) {
+                currentQueue = queues.get(currentIndex);
+            } else {
+                currentQueue = createSegmentQueue(currentIndex, thresholdBytes);
+                queues.add(currentQueue);
+            }
+        }
+
+        return this.currentQueue.offer(e);
+    }
+
+    @Override
+    public T poll() {
+        initMergedQueue();
+        if (mergedQueue != null && !mergedQueue.isEmpty()) {
+            MappedByteBufferSegmentQueue<T> queue = mergedQueue.poll();
+            T re = queue.poll();
+            if (queue.peek() != null) {
+                mergedQueue.add(queue);
+            }
+            return re;
+        }
+        return null;
+    }
+
+    @Override
+    public T peek() {
+        initMergedQueue();
+        if (mergedQueue != null && !mergedQueue.isEmpty()) {
+            return mergedQueue.peek().peek();
+        }
+        return null;
+    }
+    
+    @Override
+    public void clear() {
+        for (MappedByteBufferSegmentQueue<T> queue : getSegmentQueues()) {
+            queue.clear();
+        }
+        currentIndex = -1;
+        currentQueue = null;
+        mergedQueue = null;
+    }
+
+    @Override
+    public Iterator<T> iterator() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int size() {
+        int size = 0;
+        for (MappedByteBufferSegmentQueue<T> queue : getSegmentQueues()) {
+            size += queue.size();
+        }
+        return size;
+    }
+    
+    public long getByteSize() {
+        return currentQueue == null ? 0 : currentQueue.getInMemByteSize();
+    }
+
+    public void close() {
+        for (MappedByteBufferSegmentQueue<T> queue : queues) {
+            queue.close();
+        }
+        queues.clear();
+    }
+    
+    private void initMergedQueue() {
+        if (mergedQueue == null && currentIndex >= 0) {
+            mergedQueue = MinMaxPriorityQueue.<MappedByteBufferSegmentQueue<T>> orderedBy(
+                    getSegmentQueueComparator()).maximumSize(currentIndex + 1).create();
+            for (MappedByteBufferSegmentQueue<T> queue : getSegmentQueues()) {
+                T re = queue.peek();
+                if (re != null) {
+                    mergedQueue.add(queue);
+                }
+            }
+        }        
+    }
+
+    public abstract static class MappedByteBufferSegmentQueue<T> extends AbstractQueue<T> {
+        protected static final int EOF = -1;
+        // at least create 128 KB MappedByteBuffers
+        private static final long DEFAULT_MAPPING_SIZE = 128 * 1024;
+        
+        private final int index;
+        private final int thresholdBytes;
+        private final boolean hasMaxQueueSize;
+        private long totalResultSize = 0;
+        private int maxResultSize = 0;
+        private long mappingSize = 0;
+        private File file;
+        private boolean isClosed = false;
+        private boolean flushBuffer = false;
+        private int flushedCount = 0;
+        private T current = null;
+        private SegmentQueueFileIterator thisIterator;
+        // iterators to close on close()
+        private List<SegmentQueueFileIterator> iterators;
+
+        public MappedByteBufferSegmentQueue(int index, int thresholdBytes, boolean hasMaxQueueSize) {
+            this.index = index;
+            this.thresholdBytes = thresholdBytes;
+            this.hasMaxQueueSize = hasMaxQueueSize;
+            this.iterators = Lists.<SegmentQueueFileIterator> newArrayList();
+        }
+        
+        abstract protected Queue<T> getInMemoryQueue();
+        abstract protected int sizeOf(T e);
+        abstract protected void writeToBuffer(MappedByteBuffer buffer, T e);
+        abstract protected T readFromBuffer(MappedByteBuffer buffer);
+        
+        public int index() {
+            return this.index;
+        }
+        
+        public int size() {
+            if (flushBuffer)
+                return flushedCount;
+            return getInMemoryQueue().size();
+        }
+        
+        public long getInMemByteSize() {
+            if (flushBuffer)
+                return 0;
+            return totalResultSize;
+        }
+        
+        public boolean isFlushed() {
+            return flushBuffer;
+        }
+
+        @Override
+        public boolean offer(T e) {
+            if (isClosed || flushBuffer)
+                return false;
+            
+            boolean added = getInMemoryQueue().add(e);
+            if (added) {
+                try {
+                    flush(e);
+                } catch (IOException ex) {
+                    throw new RuntimeException(ex);
+                }
+            }
+            
+            return added;
+        }
+        
+        @Override
+        public T peek() {
+            if (current == null && !isClosed) {
+                current = next();
+            }
+            
+            return current;
+        }
+        
+        @Override
+        public T poll() {
+            T ret = peek();
+            if (!isClosed) {
+                current = next();
+            } else {
+                current = null;
+            }
+            
+            return ret;
+        }
+
+        @Override
+        public Iterator<T> iterator() {
+            if (isClosed)
+                return null;
+            
+            if (!flushBuffer)
+                return getInMemoryQueue().iterator();
+            
+            SegmentQueueFileIterator iterator = new SegmentQueueFileIterator(thisIterator);
+            iterators.add(iterator);
+            return iterator;
+        }
+
+        @Override
+        public void clear() {
+            getInMemoryQueue().clear();
+            this.totalResultSize = 0;
+            this.maxResultSize = 0;
+            this.mappingSize = 0;
+            this.flushBuffer = false;
+            this.flushedCount = 0;
+            this.current = null;
+            if (thisIterator != null) {
+                thisIterator.close();
+                thisIterator = null;
+            }
+            for (SegmentQueueFileIterator iter : iterators) {
+                iter.close();
+            }
+            iterators.clear();
+            if (this.file != null) {
+                file.delete();
+                file = null;
+            }
+        }
+        
+        public void close() {
+            if (!isClosed) {
+                clear();
+                this.isClosed = true;
+            }
+        }
+        
+        private T next() {
+            T ret = null;            
+            if (!flushBuffer) {
+                ret = getInMemoryQueue().poll();
+            } else {
+                if (thisIterator == null) {
+                    thisIterator = new SegmentQueueFileIterator();
+                }
+                ret = thisIterator.next();
+            }
+            
+            if (ret == null) {
+                close();
+            }
+            
+            return ret;
+        }
+
+        private void flush(T entry) throws IOException {
+            Queue<T> inMemQueue = getInMemoryQueue();
+            int resultSize = sizeOf(entry);
+            maxResultSize = Math.max(maxResultSize, resultSize);
+            totalResultSize = hasMaxQueueSize ? maxResultSize * inMemQueue.size() : (totalResultSize + resultSize);
+            if (totalResultSize >= thresholdBytes) {
+                this.file = File.createTempFile(UUID.randomUUID().toString(), null);
+                RandomAccessFile af = new RandomAccessFile(file, "rw");
+                FileChannel fc = af.getChannel();
+                int writeIndex = 0;
+                mappingSize = Math.min(Math.max(maxResultSize, DEFAULT_MAPPING_SIZE), totalResultSize);
+                MappedByteBuffer writeBuffer = fc.map(MapMode.READ_WRITE, writeIndex, mappingSize);
+
+                int resSize = inMemQueue.size();
+                for (int i = 0; i < resSize; i++) {                
+                    T e = inMemQueue.poll();
+                    writeToBuffer(writeBuffer, e);
+                    // buffer close to exhausted, re-map.
+                    if (mappingSize - writeBuffer.position() < maxResultSize) {
+                        writeIndex += writeBuffer.position();
+                        writeBuffer = fc.map(MapMode.READ_WRITE, writeIndex, mappingSize);
+                    }
+                }
+                writeBuffer.putInt(EOF); // end
+                fc.force(true);
+                fc.close();
+                af.close();
+                flushedCount = resSize;
+                inMemQueue.clear();
+                flushBuffer = true;
+            }
+        }
+        
+        private class SegmentQueueFileIterator implements Iterator<T>, Closeable {
+            private boolean isEnd;
+            private long readIndex;
+            private RandomAccessFile af;
+            private FileChannel fc;
+            private MappedByteBuffer readBuffer;
+            private T next;
+            
+            public SegmentQueueFileIterator() {
+                init(0);
+            }
+            
+            public SegmentQueueFileIterator(SegmentQueueFileIterator iterator) {
+                if (iterator != null && iterator.isEnd) {
+                    this.isEnd = true;
+                } else {
+                    init(iterator == null ? 0 : iterator.readIndex);
+                }
+            }
+            
+            private void init(long readIndex) {
+                this.isEnd = false;
+                this.readIndex = readIndex;
+                this.next = null;
+                try {
+                    this.af = new RandomAccessFile(file, "r");
+                    this.fc = af.getChannel();
+                    this.readBuffer = fc.map(MapMode.READ_ONLY, readIndex, mappingSize);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            @Override
+            public boolean hasNext() {
+                if (!isEnd && next == null) {
+                    next = readNext();
+                }
+                
+                return next != null;
+            }
+
+            @Override
+            public T next() {
+                if (!hasNext())
+                    return null;
+                
+                T ret = next;
+                next = readNext();
+                return ret;
+            }
+            
+            private T readNext() {
+                if (isEnd)
+                    return null;
+                
+                T e = readFromBuffer(readBuffer);
+                if (e == null) {
+                    close();
+                    return null;
+                }
+                
+                // buffer close to exhausted, re-map.
+                if (mappingSize - readBuffer.position() < maxResultSize) {
+                    readIndex += readBuffer.position();
+                    try {
+                        readBuffer = fc.map(MapMode.READ_ONLY, readIndex, mappingSize);
+                    } catch (IOException ex) {
+                        throw new RuntimeException(ex);
+                    }
+                }
+                
+                return e;
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public void close() {
+                this.isEnd = true;
+                if (this.fc != null) {
+                    try {
+                        this.fc.close();
+                    } catch (IOException ignored) {
+                    }
+                }
+                if (this.af != null) {
+                    try {
+                        this.af.close();
+                    } catch (IOException ignored) {
+                    }
+                    this.af = null;
+                }
+            }
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ebc7ee42/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java
index cd31ff7..ae2f452 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java
@@ -17,19 +17,12 @@
  */
 package org.apache.phoenix.iterate;
 
-import java.io.File;
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileChannel.MapMode;
-import java.util.AbstractQueue;
 import java.util.ArrayList;
 import java.util.Comparator;
-import java.util.Iterator;
 import java.util.List;
-import java.util.UUID;
-
+import java.util.Queue;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -41,182 +34,105 @@ import org.apache.phoenix.util.ResultUtil;
 
 import com.google.common.collect.MinMaxPriorityQueue;
 
-public class MappedByteBufferSortedQueue extends AbstractQueue<ResultEntry> {
+public class MappedByteBufferSortedQueue extends MappedByteBufferQueue<ResultEntry> {
     private Comparator<ResultEntry> comparator;
     private final int limit;
-    private final int thresholdBytes;
-    private List<MappedByteBufferPriorityQueue> queues = new ArrayList<MappedByteBufferPriorityQueue>();
-    private MappedByteBufferPriorityQueue currentQueue = null;
-    private int currentIndex = 0;
-    MinMaxPriorityQueue<IndexedResultEntry> mergedQueue = null;
 
     public MappedByteBufferSortedQueue(Comparator<ResultEntry> comparator,
             Integer limit, int thresholdBytes) throws IOException {
+        super(thresholdBytes);
         this.comparator = comparator;
         this.limit = limit == null ? -1 : limit;
-        this.thresholdBytes = thresholdBytes;
-        this.currentQueue = new MappedByteBufferPriorityQueue(0,
-                this.limit, thresholdBytes, comparator);
-        this.queues.add(currentQueue);
-    }
-
-    @Override
-    public boolean offer(ResultEntry e) {
-        try {
-            boolean isFlush = this.currentQueue.writeResult(e);
-            if (isFlush) {
-                currentIndex++;
-                currentQueue = new MappedByteBufferPriorityQueue(currentIndex,
-                        limit, thresholdBytes, comparator);
-                queues.add(currentQueue);
-            }
-        } catch (IOException ioe) {
-            throw new RuntimeException(ioe);
-        }
-
-        return true;
-    }
-
-    @Override
-    public ResultEntry poll() {
-        if (mergedQueue == null) {
-            mergedQueue = MinMaxPriorityQueue.<ResultEntry> orderedBy(
-                    comparator).maximumSize(queues.size()).create();
-            for (MappedByteBufferPriorityQueue queue : queues) {
-                try {
-                    IndexedResultEntry next = queue.getNextResult();
-                    if (next != null) {
-                        mergedQueue.add(next);
-                    }
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        }
-        if (!mergedQueue.isEmpty()) {
-            IndexedResultEntry re = mergedQueue.pollFirst();
-            if (re != null) {
-                IndexedResultEntry next = null;
-                try {
-                    next = queues.get(re.getIndex()).getNextResult();
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-                if (next != null) {
-                    mergedQueue.add(next);
-                }
-                return re;
-            }
-        }
-        return null;
     }
 
     @Override
-    public ResultEntry peek() {
-        if (mergedQueue == null) {
-            mergedQueue = MinMaxPriorityQueue.<ResultEntry> orderedBy(
-                    comparator).maximumSize(queues.size()).create();
-            for (MappedByteBufferPriorityQueue queue : queues) {
-                try {
-                    IndexedResultEntry next = queue.getNextResult();
-                    if (next != null) {
-                        mergedQueue.add(next);
-                    }
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        }
-        if (!mergedQueue.isEmpty()) {
-            IndexedResultEntry re = mergedQueue.peekFirst();
-            if (re != null) {
-                return re;
-            }
-        }
-        return null;
+    protected org.apache.phoenix.iterate.MappedByteBufferQueue.MappedByteBufferSegmentQueue<ResultEntry> createSegmentQueue(
+            int index, int thresholdBytes) {
+        return new MappedByteBufferResultEntryPriorityQueue(index, thresholdBytes, limit, comparator);
     }
 
     @Override
-    public Iterator<ResultEntry> iterator() {
-        throw new UnsupportedOperationException();
+    protected Comparator<org.apache.phoenix.iterate.MappedByteBufferQueue.MappedByteBufferSegmentQueue<ResultEntry>> getSegmentQueueComparator() {
+        return new Comparator<MappedByteBufferSegmentQueue<ResultEntry>>() {
+            @Override
+            public int compare(MappedByteBufferSegmentQueue<ResultEntry> q1,
+                    MappedByteBufferSegmentQueue<ResultEntry> q2) {
+                return comparator.compare(q1.peek(), q2.peek());
+            }};
     }
 
-    @Override
-    public int size() {
-        int size = 0;
-        for (MappedByteBufferPriorityQueue queue : queues) {
-            size += queue.size();
+    private static class MappedByteBufferResultEntryPriorityQueue extends MappedByteBufferSegmentQueue<ResultEntry> {    	
+        private MinMaxPriorityQueue<ResultEntry> results = null;
+        
+    	public MappedByteBufferResultEntryPriorityQueue(int index,
+                int thresholdBytes, int limit, Comparator<ResultEntry> comparator) {
+            super(index, thresholdBytes, limit >= 0);
+            this.results = limit < 0 ? 
+                    MinMaxPriorityQueue.<ResultEntry> orderedBy(comparator).create()
+                  : MinMaxPriorityQueue.<ResultEntry> orderedBy(comparator).maximumSize(limit).create();
         }
-        return size;
-    }
-    
-    public long getByteSize() {
-        return currentQueue.getInMemByteSize();
-    }
 
-    public void close() {
-        if (queues != null) {
-            for (MappedByteBufferPriorityQueue queue : queues) {
-                queue.close();
-            }
+        @Override
+        protected Queue<ResultEntry> getInMemoryQueue() {
+            return results;
         }
-    }
-
-    private static class IndexedResultEntry extends ResultEntry {
-        private int index;
 
-        public IndexedResultEntry(int index, ResultEntry resultEntry) {
-            super(resultEntry.sortKeys, resultEntry.result);
-            this.index = index;
+        @Override
+        protected int sizeOf(ResultEntry e) {
+            return sizeof(e.sortKeys) + sizeof(toKeyValues(e));
         }
 
-        public int getIndex() {
-            return this.index;
+        @SuppressWarnings("deprecation")
+        @Override
+        protected void writeToBuffer(MappedByteBuffer buffer, ResultEntry e) {
+            int totalLen = 0;
+            List<KeyValue> keyValues = toKeyValues(e);
+            for (KeyValue kv : keyValues) {
+                totalLen += (kv.getLength() + Bytes.SIZEOF_INT);
+            }
+            buffer.putInt(totalLen);
+            for (KeyValue kv : keyValues) {
+                buffer.putInt(kv.getLength());
+                buffer.put(kv.getBuffer(), kv.getOffset(), kv
+                        .getLength());
+            }
+            ImmutableBytesWritable[] sortKeys = e.sortKeys;
+            buffer.putInt(sortKeys.length);
+            for (ImmutableBytesWritable sortKey : sortKeys) {
+                if (sortKey != null) {
+                    buffer.putInt(sortKey.getLength());
+                    buffer.put(sortKey.get(), sortKey.getOffset(),
+                            sortKey.getLength());
+                } else {
+                    buffer.putInt(0);
+                }
+            }
         }
-    }
-
-    private static class MappedByteBufferPriorityQueue {
-    	// at least create 128 KB MappedByteBuffers
-        private static final long DEFAULT_MAPPING_SIZE = 128 * 1024;
-        
-        private final int limit;
-        private final int thresholdBytes;
-        private long totalResultSize = 0;
-        private int maxResultSize = 0;
-        private long mappingSize = 0;
-        private long writeIndex = 0;
-        private long readIndex = 0;
-        private MappedByteBuffer writeBuffer;
-        private MappedByteBuffer readBuffer;
-        private FileChannel fc;
-        private RandomAccessFile af;
-        private File file;
-        private boolean isClosed = false;
-        MinMaxPriorityQueue<ResultEntry> results = null;
-        private boolean flushBuffer = false;
-        private int index;
-        private int flushedCount;
 
-        public MappedByteBufferPriorityQueue(int index, int limit, int thresholdBytes,
-                Comparator<ResultEntry> comparator) throws IOException {
-            this.index = index;
-            this.limit = limit;
-            this.thresholdBytes = thresholdBytes;
-            results = limit < 0 ? 
-                    MinMaxPriorityQueue.<ResultEntry> orderedBy(comparator).create()
-                  : MinMaxPriorityQueue.<ResultEntry> orderedBy(comparator).maximumSize(limit).create();
-        }
-        
-        public int size() {
-            if (flushBuffer)
-                return flushedCount;
-            return results.size();
-        }
-        
-        public long getInMemByteSize() {
-            if (flushBuffer)
-                return 0;
-            return totalResultSize;
+        @Override
+        protected ResultEntry readFromBuffer(MappedByteBuffer buffer) {            
+            int length = buffer.getInt();
+            if (length < 0)
+                return null;
+            
+            byte[] rb = new byte[length];
+            buffer.get(rb);
+            Result result = ResultUtil.toResult(new ImmutableBytesWritable(rb));
+            ResultTuple rt = new ResultTuple(result);
+            int sortKeySize = buffer.getInt();
+            ImmutableBytesWritable[] sortKeys = new ImmutableBytesWritable[sortKeySize];
+            for (int i = 0; i < sortKeySize; i++) {
+                int contentLength = buffer.getInt();
+                if (contentLength > 0) {
+                    byte[] sortKeyContent = new byte[contentLength];
+                    buffer.get(sortKeyContent);
+                    sortKeys[i] = new ImmutableBytesWritable(sortKeyContent);
+                } else {
+                    sortKeys[i] = null;
+                }
+            }
+            
+            return new ResultEntry(sortKeys, rt);
         }
 
         private List<KeyValue> toKeyValues(ResultEntry entry) {
@@ -252,139 +168,5 @@ public class MappedByteBufferSortedQueue extends AbstractQueue<ResultEntry> {
             }
             return size;
         }
-
-        @SuppressWarnings("deprecation")
-        public boolean writeResult(ResultEntry entry) throws IOException {
-            if (flushBuffer)
-                throw new IOException("Results already flushed");
-            
-            int sortKeySize = sizeof(entry.sortKeys);
-            int resultSize = sizeof(toKeyValues(entry)) + sortKeySize;
-            boolean added = results.add(entry);
-            if (added) {
-                maxResultSize = Math.max(maxResultSize, resultSize);
-                totalResultSize = limit < 0 ? (totalResultSize + resultSize) : maxResultSize * results.size();
-                if (totalResultSize >= thresholdBytes) {
-                    this.file = File.createTempFile(UUID.randomUUID().toString(), null);
-                    this.af = new RandomAccessFile(file, "rw");
-                    this.fc = af.getChannel();
-                    mappingSize = Math.min(Math.max(maxResultSize, DEFAULT_MAPPING_SIZE), totalResultSize);
-                    writeBuffer = fc.map(MapMode.READ_WRITE, writeIndex, mappingSize);
-                
-                    int resSize = results.size();
-                    for (int i = 0; i < resSize; i++) {                
-                        int totalLen = 0;
-                        ResultEntry re = results.pollFirst();
-                        List<KeyValue> keyValues = toKeyValues(re);
-                        for (KeyValue kv : keyValues) {
-                            totalLen += (kv.getLength() + Bytes.SIZEOF_INT);
-                        }
-                        writeBuffer.putInt(totalLen);
-                        for (KeyValue kv : keyValues) {
-                            writeBuffer.putInt(kv.getLength());
-                            writeBuffer.put(kv.getBuffer(), kv.getOffset(), kv
-                                    .getLength());
-                        }
-                        ImmutableBytesWritable[] sortKeys = re.sortKeys;
-                        writeBuffer.putInt(sortKeys.length);
-                        for (ImmutableBytesWritable sortKey : sortKeys) {
-                            if (sortKey != null) {
-                                writeBuffer.putInt(sortKey.getLength());
-                                writeBuffer.put(sortKey.get(), sortKey.getOffset(),
-                                        sortKey.getLength());
-                            } else {
-                                writeBuffer.putInt(0);
-                            }
-                        }
-                        // buffer close to exhausted, re-map.
-                        if (mappingSize - writeBuffer.position() < maxResultSize) {
-                            writeIndex += writeBuffer.position();
-                            writeBuffer = fc.map(MapMode.READ_WRITE, writeIndex, mappingSize);
-                        }
-                    }
-                    writeBuffer.putInt(-1); // end
-                    flushedCount = results.size();
-                    results.clear();
-                    flushBuffer = true;
-                }
-            }
-            return flushBuffer;
-        }
-
-        public IndexedResultEntry getNextResult() throws IOException {
-            if (isClosed)
-                return null;
-            
-            if (!flushBuffer) {
-                ResultEntry re = results.poll();
-                if (re == null) {
-                    reachedEnd();
-                    return null;
-                }
-                return new IndexedResultEntry(index, re);
-            }
-            
-            if (readBuffer == null) {
-                readBuffer = this.fc.map(MapMode.READ_ONLY, readIndex, mappingSize);
-            }
-            
-            int length = readBuffer.getInt();
-            if (length < 0) {
-                reachedEnd();
-                return null;
-            }
-            
-            byte[] rb = new byte[length];
-            readBuffer.get(rb);
-            Result result = ResultUtil.toResult(new ImmutableBytesWritable(rb));
-            ResultTuple rt = new ResultTuple(result);
-            int sortKeySize = readBuffer.getInt();
-            ImmutableBytesWritable[] sortKeys = new ImmutableBytesWritable[sortKeySize];
-            for (int i = 0; i < sortKeySize; i++) {
-                int contentLength = readBuffer.getInt();
-                if (contentLength > 0) {
-                    byte[] sortKeyContent = new byte[contentLength];
-                    readBuffer.get(sortKeyContent);
-                    sortKeys[i] = new ImmutableBytesWritable(sortKeyContent);
-                } else {
-                    sortKeys[i] = null;
-                }
-            }
-            // buffer close to exhausted, re-map.
-            if (mappingSize - readBuffer.position() < maxResultSize) {
-                readIndex += readBuffer.position();
-                readBuffer = fc.map(MapMode.READ_ONLY, readIndex, mappingSize);
-            }
-            
-            return new IndexedResultEntry(index, new ResultEntry(sortKeys, rt));
-        }
-
-        private void reachedEnd() {
-            this.isClosed = true;
-            if (this.fc != null) {
-                try {
-                    this.fc.close();
-                } catch (IOException ignored) {
-                }
-                this.fc = null;
-            }
-            if (this.af != null) {
-                try {
-                    this.af.close();
-                } catch (IOException ignored) {
-                }
-                this.af = null;
-            }
-            if (this.file != null) {
-                file.delete();
-                file = null;
-            }
-        }
-
-        public void close() {
-            if (!isClosed) {
-                this.reachedEnd();
-            }
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ebc7ee42/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
index ea20114..068547f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/HintNode.java
@@ -82,7 +82,11 @@ public class HintNode {
         */
        NO_CACHE,
        /**
-        * Avoid using star-join optimization.
+        * Use sort-merge join algorithm instead of broadcast join (hash join) algorithm.
+        */
+       USE_SORT_MERGE_JOIN,
+       /**
+        * Avoid using star-join optimization. Used for broadcast join (hash join) only.
         */
        NO_STAR_JOIN,
        /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ebc7ee42/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index cc0b455..6d3123f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -590,6 +590,11 @@ public class ParseNodeFactory {
         return new DeleteStatement(table, hint, node, orderBy, limit, bindCount);
     }
 
+    public SelectStatement select(SelectStatement statement, ParseNode where) {
+        return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(), where, statement.getGroupBy(), statement.getHaving(),
+                statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence());
+    }
+
     public SelectStatement select(SelectStatement statement, ParseNode where, ParseNode having) {
         return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(), where, statement.getGroupBy(), having,
                 statement.getOrderBy(), statement.getLimit(), statement.getBindCount(), statement.isAggregate(), statement.hasSequence());
@@ -625,6 +630,12 @@ public class ParseNodeFactory {
                 statement.hasSequence());
     }
 
+    public SelectStatement select(SelectStatement statement, List<OrderByNode> orderBy) {
+        return select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(),
+                statement.getWhere(), statement.getGroupBy(), statement.getHaving(), orderBy, statement.getLimit(),
+                statement.getBindCount(), statement.isAggregate(), statement.hasSequence());
+    }
+
     public SelectStatement select(SelectStatement statement, HintNode hint) {
         return hint == null || hint.isEmpty() ? statement : select(statement.getFrom(), hint, statement.isDistinct(), statement.getSelect(),
                 statement.getWhere(), statement.getGroupBy(), statement.getHaving(), statement.getOrderBy(), statement.getLimit(),

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ebc7ee42/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatementRewriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatementRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatementRewriter.java
index ab06d46..3152abe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatementRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/SelectStatementRewriter.java
@@ -52,7 +52,7 @@ public class SelectStatementRewriter extends ParseNodeRewriter {
         SelectStatementRewriter rewriter = new SelectStatementRewriter(removeNodes);
         where = where.accept(rewriter);
         // Return new SELECT statement with updated WHERE clause
-        return NODE_FACTORY.select(statement, where, statement.getHaving());
+        return NODE_FACTORY.select(statement, where);
     }
     
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ebc7ee42/phoenix-core/src/main/java/org/apache/phoenix/schema/ValueBitSet.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ValueBitSet.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ValueBitSet.java
index 47acdac..7931659 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ValueBitSet.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ValueBitSet.java
@@ -145,10 +145,14 @@ public class ValueBitSet {
     }
     
     public void or(ImmutableBytesWritable ptr) {
-        if (schema == null) {
+        or(ptr, isVarLength() ? Bytes.SIZEOF_SHORT + 1 : Bytes.SIZEOF_SHORT);
+    }
+    
+    public void or(ImmutableBytesWritable ptr, int length) {
+        if (schema == null || length == 0) {
             return;
         }
-        if (isVarLength()) {
+        if (length > Bytes.SIZEOF_SHORT) {
             int offset = ptr.getOffset() + ptr.getLength() - Bytes.SIZEOF_SHORT;
             short nLongs = Bytes.toShort(ptr.get(), offset);
             offset -= nLongs * Bytes.SIZEOF_LONG;
@@ -160,7 +164,7 @@ public class ValueBitSet {
         } else {
             long l = Bytes.toShort(ptr.get(), ptr.getOffset() + ptr.getLength() - Bytes.SIZEOF_SHORT);
             bits[0] |= l;
-            maxSetBit = Math.max(maxSetBit, BITS_PER_SHORT - 1);
+            maxSetBit = Math.max(maxSetBit, (bits[0] == 0 ? 0 : BITS_PER_SHORT) - 1);
         }
         
     }
@@ -196,3 +200,4 @@ public class ValueBitSet {
         maxSetBit = Math.max(maxSetBit, isSet.maxSetBit);
     }
 }
+


[2/4] phoenix git commit: PHOENIX-1799 Support many-to-many joins

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ebc7ee42/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java
index a561a47..016cd52 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/GroupByCompiler.java
@@ -136,7 +136,7 @@ public class GroupByCompiler {
      * @throws ColumnNotFoundException if column name could not be resolved
      * @throws AmbiguousColumnException if an unaliased column name is ambiguous across multiple tables
      */
-    public static GroupBy compile(StatementContext context, SelectStatement statement, TupleProjector tupleProjector) throws SQLException {
+    public static GroupBy compile(StatementContext context, SelectStatement statement, TupleProjector tupleProjector, boolean isInRowKeyOrder) throws SQLException {
         List<ParseNode> groupByNodes = statement.getGroupBy();
         /**
          * Distinct can use an aggregate plan if there's no group by.
@@ -179,7 +179,7 @@ public class GroupByCompiler {
             return GroupBy.EMPTY_GROUP_BY;
         }
         
-        boolean isRowKeyOrderedGrouping = groupByVisitor.isOrderPreserving();
+        boolean isRowKeyOrderedGrouping = isInRowKeyOrder && groupByVisitor.isOrderPreserving();
         List<Expression> expressions = Lists.newArrayListWithCapacity(groupByEntries.size());
         List<Expression> keyExpressions = expressions;
         String groupExprAttribName;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ebc7ee42/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
index b519dc4..45b6603 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java
@@ -20,7 +20,6 @@ package org.apache.phoenix.compile;
 import static org.apache.phoenix.schema.SaltingUtil.SALTING_COLUMN;
 
 import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -293,6 +292,10 @@ public class JoinCompiler {
             return columnRefs;
         }
         
+        public ParseNode getPostFiltersCombined() {
+            return combine(postFilters);
+        }
+        
         public void addFilter(ParseNode filter) throws SQLException {
             if (joinSpecs.isEmpty()) {
                 table.addFilter(filter);
@@ -320,7 +323,7 @@ public class JoinCompiler {
             for (JoinSpec joinSpec : joinSpecs) {
                 JoinTable joinTable = joinSpec.getJoinTable();
                 boolean hasSubJoin = !joinTable.getJoinSpecs().isEmpty();
-                for (ComparisonParseNode node : joinSpec.getOnConditions()) {
+                for (EqualParseNode node : joinSpec.getOnConditions()) {
                     node.getLHS().accept(generalRefVisitor);
                     if (hasSubJoin) {
                         node.getRHS().accept(generalRefVisitor);
@@ -384,13 +387,12 @@ public class JoinCompiler {
         }
         
         public SelectStatement getAsSingleSubquery(SelectStatement query, boolean asSubquery) throws SQLException {
-            if (!isFlat(query))
-                throw new SQLFeatureNotSupportedException("Complex subqueries not supported as left join table.");
+            assert (isFlat(query));
             
             if (asSubquery)
                 return query;
             
-            return NODE_FACTORY.select(query.getFrom(), select.getHint(), select.isDistinct(), select.getSelect(), query.getWhere(), select.getGroupBy(), select.getHaving(), select.getOrderBy(), select.getLimit(), select.getBindCount(), select.isAggregate(), select.hasSequence());            
+            return NODE_FACTORY.select(select, query.getFrom(), query.getWhere());            
         }
         
         public boolean hasPostReference() {
@@ -427,7 +429,7 @@ public class JoinCompiler {
     
     public static class JoinSpec {
         private final JoinType type;
-        private final List<ComparisonParseNode> onConditions;
+        private final List<EqualParseNode> onConditions;
         private final JoinTable joinTable;
         private final boolean singleValueOnly;
         private Set<TableRef> dependencies;
@@ -436,7 +438,7 @@ public class JoinCompiler {
         private JoinSpec(JoinType type, ParseNode onNode, JoinTable joinTable, 
                 boolean singleValueOnly, ColumnResolver resolver) throws SQLException {
             this.type = type;
-            this.onConditions = new ArrayList<ComparisonParseNode>();
+            this.onConditions = new ArrayList<EqualParseNode>();
             this.joinTable = joinTable;
             this.singleValueOnly = singleValueOnly;
             this.dependencies = new HashSet<TableRef>();
@@ -454,7 +456,7 @@ public class JoinCompiler {
             return type;
         }
         
-        public List<ComparisonParseNode> getOnConditions() {
+        public List<EqualParseNode> getOnConditions() {
             return onConditions;
         }
         
@@ -470,75 +472,63 @@ public class JoinCompiler {
             return dependencies;
         }
         
-        public Pair<List<Expression>, List<Expression>> compileJoinConditions(StatementContext context, ColumnResolver leftResolver, ColumnResolver rightResolver) throws SQLException {
+        public Pair<List<Expression>, List<Expression>> compileJoinConditions(StatementContext lhsCtx, StatementContext rhsCtx, boolean sortExpressions) throws SQLException {
             if (onConditions.isEmpty()) {
                 return new Pair<List<Expression>, List<Expression>>(
                         Collections.<Expression> singletonList(LiteralExpression.newConstant(1)), 
                         Collections.<Expression> singletonList(LiteralExpression.newConstant(1)));
             }
             
-            ColumnResolver resolver = context.getResolver();
-            List<Pair<Expression, Expression>> compiled = new ArrayList<Pair<Expression, Expression>>(onConditions.size());
-            context.setResolver(leftResolver);
-            ExpressionCompiler expressionCompiler = new ExpressionCompiler(context);
-            for (ParseNode condition : onConditions) {
-                assert (condition instanceof EqualParseNode);
-                EqualParseNode equalNode = (EqualParseNode) condition;
-                expressionCompiler.reset();
-                Expression left = equalNode.getLHS().accept(expressionCompiler);
-                compiled.add(new Pair<Expression, Expression>(left, null));
-            }
-            context.setResolver(rightResolver);
-            expressionCompiler = new ExpressionCompiler(context);
-            Iterator<Pair<Expression, Expression>> iter = compiled.iterator();
-            for (ParseNode condition : onConditions) {
-                Pair<Expression, Expression> p = iter.next();
-                EqualParseNode equalNode = (EqualParseNode) condition;
-                expressionCompiler.reset();
-                Expression right = equalNode.getRHS().accept(expressionCompiler);
-                Expression left = p.getFirst();
+            List<Pair<Expression, Expression>> compiled = Lists.<Pair<Expression, Expression>> newArrayListWithExpectedSize(onConditions.size());
+            ExpressionCompiler lhsCompiler = new ExpressionCompiler(lhsCtx);
+            ExpressionCompiler rhsCompiler = new ExpressionCompiler(rhsCtx);
+            for (EqualParseNode condition : onConditions) {
+                lhsCompiler.reset();
+                Expression left = condition.getLHS().accept(lhsCompiler);
+                rhsCompiler.reset();
+                Expression right = condition.getRHS().accept(rhsCompiler);
                 PDataType toType = getCommonType(left.getDataType(), right.getDataType());
                 if (left.getDataType() != toType) {
                     left = CoerceExpression.create(left, toType);
-                    p.setFirst(left);
                 }
                 if (right.getDataType() != toType) {
                     right = CoerceExpression.create(right, toType);
                 }
-                p.setSecond(right);
+                compiled.add(new Pair<Expression, Expression>(left, right));
             }
-            context.setResolver(resolver); // recover the resolver
-            Collections.sort(compiled, new Comparator<Pair<Expression, Expression>>() {
-                @Override
-                public int compare(Pair<Expression, Expression> o1, Pair<Expression, Expression> o2) {
-                    Expression e1 = o1.getFirst();
-                    Expression e2 = o2.getFirst();
-                    boolean isFixed1 = e1.getDataType().isFixedWidth();
-                    boolean isFixed2 = e2.getDataType().isFixedWidth();
-                    boolean isFixedNullable1 = e1.isNullable() &&isFixed1;
-                    boolean isFixedNullable2 = e2.isNullable() && isFixed2;
-                    if (isFixedNullable1 == isFixedNullable2) {
-                        if (isFixed1 == isFixed2) {
-                            return 0;
-                        } else if (isFixed1) {
-                            return -1;
-                        } else {
+            if (sortExpressions) {
+                Collections.sort(compiled, new Comparator<Pair<Expression, Expression>>() {
+                    @Override
+                    public int compare(Pair<Expression, Expression> o1, Pair<Expression, Expression> o2) {
+                        Expression e1 = o1.getFirst();
+                        Expression e2 = o2.getFirst();
+                        boolean isFixed1 = e1.getDataType().isFixedWidth();
+                        boolean isFixed2 = e2.getDataType().isFixedWidth();
+                        boolean isFixedNullable1 = e1.isNullable() &&isFixed1;
+                        boolean isFixedNullable2 = e2.isNullable() && isFixed2;
+                        if (isFixedNullable1 == isFixedNullable2) {
+                            if (isFixed1 == isFixed2) {
+                                return 0;
+                            } else if (isFixed1) {
+                                return -1;
+                            } else {
+                                return 1;
+                            }
+                        } else if (isFixedNullable1) {
                             return 1;
+                        } else {
+                            return -1;
                         }
-                    } else if (isFixedNullable1) {
-                        return 1;
-                    } else {
-                        return -1;
                     }
-                }
-            });
-            List<Expression> lConditions = new ArrayList<Expression>(compiled.size());
-            List<Expression> rConditions = new ArrayList<Expression>(compiled.size());
+                });
+            }
+            List<Expression> lConditions = Lists.<Expression> newArrayListWithExpectedSize(compiled.size());
+            List<Expression> rConditions = Lists.<Expression> newArrayListWithExpectedSize(compiled.size());
             for (Pair<Expression, Expression> pair : compiled) {
                 lConditions.add(pair.getFirst());
                 rConditions.add(pair.getSecond());
             }
-            
+
             return new Pair<List<Expression>, List<Expression>>(lConditions, rConditions);
         }
         
@@ -683,11 +673,11 @@ public class JoinCompiler {
             return JoinCompiler.compilePostFilterExpression(context, postFilters);
         }
         
-        public SelectStatement getAsSubquery() throws SQLException {
+        public SelectStatement getAsSubquery(List<OrderByNode> orderBy) throws SQLException {
             if (isSubselect())
-                return SubselectRewriter.applyPostFilters(subselect, preFilters, tableNode.getAlias());
+                return SubselectRewriter.applyOrderBy(SubselectRewriter.applyPostFilters(subselect, preFilters, tableNode.getAlias()), orderBy, tableNode.getAlias());
             
-            return NODE_FACTORY.select(tableNode, select.getHint(), false, selectNodes, getPreFiltersCombined(), null, null, null, null, 0, false, select.hasSequence());
+            return NODE_FACTORY.select(tableNode, select.getHint(), false, selectNodes, getPreFiltersCombined(), null, null, orderBy, null, 0, false, select.hasSequence());
         }
         
         public boolean hasFilters() {
@@ -912,12 +902,12 @@ public class JoinCompiler {
     }
     
     private static class OnNodeVisitor extends BooleanParseNodeVisitor<Void> {
-        private List<ComparisonParseNode> onConditions;
+        private List<EqualParseNode> onConditions;
         private Set<TableRef> dependencies;
         private JoinTable joinTable;
         private ColumnRefParseNodeVisitor columnRefVisitor;
         
-        public OnNodeVisitor(ColumnResolver resolver, List<ComparisonParseNode> onConditions, 
+        public OnNodeVisitor(ColumnResolver resolver, List<EqualParseNode> onConditions, 
                 Set<TableRef> dependencies, JoinTable joinTable) {
             this.onConditions = onConditions;
             this.dependencies = dependencies;
@@ -981,7 +971,7 @@ public class JoinCompiler {
                 joinTable.addFilter(node);
             } else if (lhsType == ColumnRefParseNodeVisitor.ColumnRefType.FOREIGN_ONLY
                     && rhsType == ColumnRefParseNodeVisitor.ColumnRefType.SELF_ONLY) {
-                onConditions.add(node);
+                onConditions.add((EqualParseNode) node);
                 dependencies.addAll(lhsTableRefSet);
             } else if (rhsType == ColumnRefParseNodeVisitor.ColumnRefType.FOREIGN_ONLY
                     && lhsType == ColumnRefParseNodeVisitor.ColumnRefType.SELF_ONLY) {
@@ -1069,9 +1059,9 @@ public class JoinCompiler {
         }
     }
     
-    private static String PROJECTED_TABLE_SCHEMA = ".";
+    private static final String PROJECTED_TABLE_SCHEMA = ".";
     // for creation of new statements
-    private static ParseNodeFactory NODE_FACTORY = new ParseNodeFactory();
+    private static final ParseNodeFactory NODE_FACTORY = new ParseNodeFactory();
     
     private static boolean isFlat(SelectStatement select) {
         return !select.isJoin() 
@@ -1167,7 +1157,7 @@ public class JoinCompiler {
             QueryCompiler compiler = new QueryCompiler(statement, select, resolver);
             List<Object> binds = statement.getParameters();
             StatementContext ctx = new StatementContext(statement, resolver, new Scan(), new SequenceManager(statement));
-            QueryPlan plan = compiler.compileJoinQuery(ctx, binds, join, false);
+            QueryPlan plan = compiler.compileJoinQuery(ctx, binds, join, false, false, null);
             TableRef table = plan.getTableRef();
             if (groupByTableRef != null && !groupByTableRef.equals(table)) {
                 groupByTableRef = null;
@@ -1303,17 +1293,30 @@ public class JoinCompiler {
             return new JoinedTableColumnResolver(this, origResolver);
         }
         
-        public PTableWrapper mergeProjectedTables(PTableWrapper rWrapper, boolean innerJoin) throws SQLException {
+        public PTableWrapper mergeProjectedTables(PTableWrapper rWrapper, JoinType type) throws SQLException {
             PTable left = this.getTable();
             PTable right = rWrapper.getTable();
-            List<PColumn> merged = new ArrayList<PColumn>();
-            merged.addAll(left.getColumns());
+            List<PColumn> merged = Lists.<PColumn> newArrayList();
+            if (type != JoinType.Full) {
+                merged.addAll(left.getColumns());
+            } else {
+                for (PColumn c : left.getColumns()) {
+                    if (SchemaUtil.isPKColumn(c)) {
+                        merged.add(c);
+                    } else {
+                        PColumnImpl column = new PColumnImpl(c.getName(), c.getFamilyName(), c.getDataType(), 
+                                c.getMaxLength(), c.getScale(), true, c.getPosition(), 
+                                c.getSortOrder(), c.getArraySize(), c.getViewConstant(), c.isViewReferenced());
+                        merged.add(column);
+                    }
+                }
+            }
             int position = merged.size();
             for (PColumn c : right.getColumns()) {
                 if (!SchemaUtil.isPKColumn(c)) {
                     PColumnImpl column = new PColumnImpl(c.getName(), 
                             PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), c.getDataType(), 
-                            c.getMaxLength(), c.getScale(), innerJoin ? c.isNullable() : true, position++, 
+                            c.getMaxLength(), c.getScale(), type == JoinType.Inner ? c.isNullable() : true, position++, 
                             c.getSortOrder(), c.getArraySize(), c.getViewConstant(), c.isViewReferenced());
                     merged.add(column);
                 }
@@ -1358,12 +1361,16 @@ public class JoinCompiler {
     	private JoinedTableColumnResolver(PTableWrapper table, ColumnResolver tableResolver) {
     		this.table = table;
     		this.tableResolver = tableResolver;
-            this.tableRef = new TableRef(null, table.getTable(), 0, false);
+            this.tableRef = new TableRef(ParseNodeFactory.createTempAlias(), table.getTable(), 0, false);
     	}
         
         public PTableWrapper getPTableWrapper() {
             return table;
         }
+        
+        public TableRef getTableRef() {
+            return tableRef;
+        }
 
 		@Override
 		public List<TableRef> getTables() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ebc7ee42/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
index 2629846..444b05e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/OrderByCompiler.java
@@ -33,6 +33,7 @@ import org.apache.phoenix.parse.FilterableStatement;
 import org.apache.phoenix.parse.OrderByNode;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.SortOrder;
 
 import com.google.common.collect.ImmutableList;
@@ -77,7 +78,8 @@ public class OrderByCompiler {
      */
     public static OrderBy compile(StatementContext context,
                                   FilterableStatement statement,
-                                  GroupBy groupBy, Integer limit) throws SQLException {
+                                  GroupBy groupBy, Integer limit, 
+                                  boolean isInRowKeyOrder) throws SQLException {
         List<OrderByNode> orderByNodes = statement.getOrderBy();
         if (orderByNodes.isEmpty()) {
             return OrderBy.EMPTY_ORDER_BY;
@@ -115,11 +117,14 @@ public class OrderByCompiler {
             return OrderBy.EMPTY_ORDER_BY;
         }
         // If we're ordering by the order returned by the scan, we don't need an order by
-        if (visitor.isOrderPreserving()) {
+        if (isInRowKeyOrder && visitor.isOrderPreserving()) {
             if (visitor.isReverse()) {
                 // Don't use reverse scan if we're using a skip scan, as our skip scan doesn't support this yet.
+                // REV_ROW_KEY_ORDER_BY scan would not take effect for a projected table, so don't return it for such table types.
                 if (context.getConnection().getQueryServices().getProps().getBoolean(QueryServices.USE_REVERSE_SCAN_ATTRIB, QueryServicesOptions.DEFAULT_USE_REVERSE_SCAN)
-                        && !context.getScanRanges().useSkipScanFilter()) {
+                        && !context.getScanRanges().useSkipScanFilter()
+                        && context.getCurrentTable().getTable().getType() != PTableType.JOIN
+                        && context.getCurrentTable().getTable().getType() != PTableType.SUBQUERY) {
                     return OrderBy.REV_ROW_KEY_ORDER_BY;
                 }
             } else {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ebc7ee42/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index e76c05c..126c870 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -40,6 +40,7 @@ import org.apache.phoenix.execute.HashJoinPlan;
 import org.apache.phoenix.execute.HashJoinPlan.HashSubPlan;
 import org.apache.phoenix.execute.HashJoinPlan.WhereClauseSubPlan;
 import org.apache.phoenix.execute.ScanPlan;
+import org.apache.phoenix.execute.SortMergeJoinPlan;
 import org.apache.phoenix.execute.TupleProjectionPlan;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
@@ -51,12 +52,17 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.parse.AliasedNode;
+import org.apache.phoenix.parse.EqualParseNode;
 import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.parse.JoinTableNode.JoinType;
+import org.apache.phoenix.parse.OrderByNode;
 import org.apache.phoenix.parse.ParseNode;
+import org.apache.phoenix.parse.ParseNodeFactory;
 import org.apache.phoenix.parse.SQLParser;
 import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.parse.SubqueryParseNode;
+import org.apache.phoenix.parse.TableNode;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.AmbiguousColumnException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
@@ -78,6 +84,7 @@ import com.google.common.collect.Sets;
  * @since 0.1
  */
 public class QueryCompiler {
+    private static final ParseNodeFactory NODE_FACTORY = new ParseNodeFactory();
     /* 
      * Not using Scan.setLoadColumnFamiliesOnDemand(true) because we don't 
      * want to introduce a dependency on 0.94.5 (where this feature was
@@ -93,6 +100,7 @@ public class QueryCompiler {
     private final List<? extends PDatum> targetColumns;
     private final ParallelIteratorFactory parallelIteratorFactory;
     private final SequenceManager sequenceManager;
+    private final boolean useSortMergeJoin;
     
     public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver) throws SQLException {
         this(statement, select, resolver, Collections.<PDatum>emptyList(), null, new SequenceManager(statement));
@@ -106,6 +114,7 @@ public class QueryCompiler {
         this.targetColumns = targetColumns;
         this.parallelIteratorFactory = parallelIteratorFactory;
         this.sequenceManager = sequenceManager;
+        this.useSortMergeJoin = select.getHint().hasHint(Hint.USE_SORT_MERGE_JOIN);
         if (statement.getConnection().getQueryServices().getLowestClusterHBaseVersion() >= PhoenixDatabaseMetaData.ESSENTIAL_FAMILY_VERSION_THRESHOLD) {
             this.scan.setAttribute(LOAD_COLUMN_FAMILIES_ON_DEMAND_ATTR, QueryConstants.TRUE);
         }
@@ -138,26 +147,56 @@ public class QueryCompiler {
                 context = new StatementContext(statement, resolver, scan, sequenceManager);
             }
             JoinTable joinTable = JoinCompiler.compile(statement, select, context.getResolver());
-            return compileJoinQuery(context, binds, joinTable, false);
+            return compileJoinQuery(context, binds, joinTable, false, false, null);
         } else {
             return compileSingleQuery(context, select, binds, false, true);
         }
     }
-    
+
+    /*
+     * Call compileJoinQuery() for join queries recursively down to the leaf JoinTable nodes.
+     * This matches the input JoinTable node against patterns in the following order:
+     * 1. A (leaf JoinTable node, which can be a named table reference or a subquery of any kind.)
+     *    Returns the compilation result of a single table scan or of an independent subquery.
+     * 2. Matching either of (when hint USE_SORT_MERGE_JOIN not specified):
+     *        1) A LEFT/INNER JOIN B
+     *        2) A LEFT/INNER JOIN B (LEFT/INNER JOIN C)+, if hint NO_STAR_JOIN not specified
+     *        where A can be a named table reference or a flat subquery, and B, C, ... can be a named
+     *        table reference, a sub-join or a subquery of any kind.
+     *    Returns a HashJoinPlan{scan: A, hash: B, C, ...}.
+     * 3. Matching pattern:
+     *        A RIGHT/INNER JOIN B (when hint USE_SORT_MERGE_JOIN not specified)
+     *        where B can be a named table reference or a flat subquery, and A can be a named table
+     *        reference, a sub-join or a subquery of any kind.
+     *    Returns a HashJoinPlan{scan: B, hash: A}.
+     *    NOTE that "A LEFT/RIGHT/INNER/FULL JOIN B RIGHT/INNER JOIN C" is viewed as
+     *    "(A LEFT/RIGHT/INNER/FULL JOIN B) RIGHT/INNER JOIN C" here, which means the left part in the
+     *    parenthesis is considered a sub-join.
+     *    viewed as a sub-join.
+     * 4. All the rest that do not qualify for previous patterns or conditions, including FULL joins.
+     *    Returns a SortMergeJoinPlan, the sorting part of which is pushed down to the JoinTable nodes
+     *    of both sides as order-by clauses.
+     * NOTE that SEMI or ANTI joins are treated the same way as LEFT joins in JoinTable pattern matching.
+     *    
+     * If no join algorithm hint is provided, according to the above compilation process, a join query 
+     * plan can probably consist of both HashJoinPlan and SortMergeJoinPlan which may enclose each other.
+     * TODO 1) Use table statistics to guide the choice of join plans.
+     *      2) Make it possible to hint a certain join algorithm for a specific join step.
+     */
     @SuppressWarnings("unchecked")
-    protected QueryPlan compileJoinQuery(StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery) throws SQLException {
+    protected QueryPlan compileJoinQuery(StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery, boolean projectPKColumns, List<OrderByNode> orderBy) throws SQLException {
         byte[] emptyByteArray = new byte[0];
         List<JoinSpec> joinSpecs = joinTable.getJoinSpecs();
         if (joinSpecs.isEmpty()) {
             Table table = joinTable.getTable();
-            SelectStatement subquery = table.getAsSubquery();
+            SelectStatement subquery = table.getAsSubquery(orderBy);
             if (!table.isSubselect()) {
-                ProjectedPTableWrapper projectedTable = table.createProjectedTable(!asSubquery);
+                ProjectedPTableWrapper projectedTable = table.createProjectedTable(!projectPKColumns);
                 TupleProjector.serializeProjectorIntoScan(context.getScan(), projectedTable.createTupleProjector());
                 context.setCurrentTable(table.getTableRef());
                 context.setResolver(projectedTable.createColumnResolver());
                 table.projectColumns(context.getScan());
-                return compileSingleQuery(context, subquery, binds, asSubquery, true);
+                return compileSingleQuery(context, subquery, binds, asSubquery, !asSubquery);
             }
             QueryPlan plan = compileSubquery(subquery);
             ProjectedPTableWrapper projectedTable = table.createProjectedTable(plan.getProjector());
@@ -165,25 +204,26 @@ public class QueryCompiler {
             return new TupleProjectionPlan(plan, projectedTable.createTupleProjector(), table.compilePostFilterExpression(context));
         }
         
-        boolean[] starJoinVector = joinTable.getStarJoinVector();
-        if (starJoinVector != null) {
+        boolean[] starJoinVector;
+        if (!this.useSortMergeJoin && (starJoinVector = joinTable.getStarJoinVector()) != null) {
             Table table = joinTable.getTable();
             ProjectedPTableWrapper initialProjectedTable;
             TableRef tableRef;
             SelectStatement query;
             if (!table.isSubselect()) {
-                initialProjectedTable = table.createProjectedTable(!asSubquery);
+                initialProjectedTable = table.createProjectedTable(!projectPKColumns);
                 tableRef = table.getTableRef();
                 table.projectColumns(context.getScan());
-                query = joinTable.getAsSingleSubquery(table.getAsSubquery(), asSubquery);
+                query = joinTable.getAsSingleSubquery(table.getAsSubquery(orderBy), asSubquery);
             } else {
-                SelectStatement subquery = table.getAsSubquery();
+                SelectStatement subquery = table.getAsSubquery(orderBy);
                 QueryPlan plan = compileSubquery(subquery);
                 initialProjectedTable = table.createProjectedTable(plan.getProjector());
                 tableRef = plan.getTableRef();
                 context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap());
                 query = joinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery);
             }
+            context.setCurrentTable(tableRef);
             PTableWrapper projectedTable = initialProjectedTable;
             int count = joinSpecs.size();
             ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[count];
@@ -199,13 +239,12 @@ public class QueryCompiler {
                 JoinSpec joinSpec = joinSpecs.get(i);
                 Scan subScan = ScanUtil.newScan(originalScan);
                 StatementContext subContext = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
-                QueryPlan joinPlan = compileJoinQuery(subContext, binds, joinSpec.getJoinTable(), true);
-                ColumnResolver resolver = subContext.getResolver();
+                QueryPlan joinPlan = compileJoinQuery(subContext, binds, joinSpec.getJoinTable(), true, true, null);
                 boolean hasPostReference = joinSpec.getJoinTable().hasPostReference();
                 if (hasPostReference) {
-                    PTableWrapper subProjTable = ((JoinedTableColumnResolver) (resolver)).getPTableWrapper();
+                    PTableWrapper subProjTable = ((JoinedTableColumnResolver) subContext.getResolver()).getPTableWrapper();
                     tables[i] = subProjTable.getTable();
-                    projectedTable = projectedTable.mergeProjectedTables(subProjTable, joinSpec.getType() == JoinType.Inner);
+                    projectedTable = projectedTable.mergeProjectedTables(subProjTable, joinSpec.getType());
                     needsProject = true;
                 } else {
                     tables[i] = null;
@@ -213,13 +252,13 @@ public class QueryCompiler {
                 if (!starJoinVector[i]) {
                     needsProject = true;
                 }
-                ColumnResolver leftResolver = (!forceProjection && starJoinVector[i]) ? joinTable.getOriginalResolver() : projectedTable.createColumnResolver();
+                context.setResolver((!forceProjection && starJoinVector[i]) ? joinTable.getOriginalResolver() : projectedTable.createColumnResolver());
                 joinIds[i] = new ImmutableBytesPtr(emptyByteArray); // place-holder
-                Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, leftResolver, resolver);
+                Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, subContext, true);
                 joinExpressions[i] = joinConditions.getFirst();
                 List<Expression> hashExpressions = joinConditions.getSecond();
                 Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
-                boolean complete = getKeyExpressionCombinations(keyRangeExpressions, context, tableRef, joinSpec.getType(), joinExpressions[i], hashExpressions);
+                boolean complete = getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), tableRef, joinSpec.getType(), joinExpressions[i], hashExpressions);
                 Expression keyRangeLhsExpression = keyRangeExpressions.getFirst();
                 Expression keyRangeRhsExpression = keyRangeExpressions.getSecond();
                 boolean hasFilters = joinSpec.getJoinTable().hasFilters();
@@ -233,9 +272,8 @@ public class QueryCompiler {
             if (needsProject) {
                 TupleProjector.serializeProjectorIntoScan(context.getScan(), initialProjectedTable.createTupleProjector());
             }
-            context.setCurrentTable(tableRef);
             context.setResolver(needsProject ? projectedTable.createColumnResolver() : joinTable.getOriginalResolver());
-            QueryPlan plan = compileSingleQuery(context, query, binds, asSubquery, joinTable.isAllLeftJoin());
+            QueryPlan plan = compileSingleQuery(context, query, binds, asSubquery, !asSubquery && joinTable.isAllLeftJoin());
             Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, table);
             Integer limit = null;
             if (query.getLimit() != null && !query.isAggregate() && !query.isDistinct() && query.getOrderBy().isEmpty()) {
@@ -247,65 +285,114 @@ public class QueryCompiler {
         
         JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1);
         JoinType type = lastJoinSpec.getType();
-        if (type == JoinType.Full)
-            throw new SQLFeatureNotSupportedException(type + " joins not supported.");
-        
-        if (type == JoinType.Right || type == JoinType.Inner) {
-            if (!lastJoinSpec.getJoinTable().getJoinSpecs().isEmpty())
-                throw new SQLFeatureNotSupportedException("Right join followed by sub-join is not supported.");
-            
+        if (!this.useSortMergeJoin 
+                && (type == JoinType.Right || type == JoinType.Inner) 
+                && lastJoinSpec.getJoinTable().getJoinSpecs().isEmpty()
+                && lastJoinSpec.getJoinTable().getTable().isFlat()) {            
             JoinTable rhsJoinTable = lastJoinSpec.getJoinTable();
             Table rhsTable = rhsJoinTable.getTable();
             JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();
             Scan subScan = ScanUtil.newScan(originalScan);
             StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
-            QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true);
-            ColumnResolver lhsResolver = lhsCtx.getResolver();
-            PTableWrapper lhsProjTable = ((JoinedTableColumnResolver) (lhsResolver)).getPTableWrapper();
+            QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, true, null);
+            PTableWrapper lhsProjTable = ((JoinedTableColumnResolver) lhsCtx.getResolver()).getPTableWrapper();
             ProjectedPTableWrapper rhsProjTable;
             TableRef rhsTableRef;
             SelectStatement rhs;
             if (!rhsTable.isSubselect()) {
-                rhsProjTable = rhsTable.createProjectedTable(!asSubquery);
+                rhsProjTable = rhsTable.createProjectedTable(!projectPKColumns);
                 rhsTableRef = rhsTable.getTableRef();
                 rhsTable.projectColumns(context.getScan());
-                rhs = rhsJoinTable.getAsSingleSubquery(rhsTable.getAsSubquery(), asSubquery);
+                rhs = rhsJoinTable.getAsSingleSubquery(rhsTable.getAsSubquery(orderBy), asSubquery);
             } else {
-                SelectStatement subquery = rhsTable.getAsSubquery();
+                SelectStatement subquery = rhsTable.getAsSubquery(orderBy);
                 QueryPlan plan = compileSubquery(subquery);
                 rhsProjTable = rhsTable.createProjectedTable(plan.getProjector());
                 rhsTableRef = plan.getTableRef();
                 context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap());
                 rhs = rhsJoinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery);
             }
+            context.setCurrentTable(rhsTableRef);
             boolean forceProjection = rhsTable.isSubselect();
-            ColumnResolver rhsResolver = forceProjection ? rhsProjTable.createColumnResolver() : joinTable.getOriginalResolver();
+            context.setResolver(forceProjection ? rhsProjTable.createColumnResolver() : joinTable.getOriginalResolver());
             ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[] {new ImmutableBytesPtr(emptyByteArray)};
-            Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(context, lhsResolver, rhsResolver);
+            Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(lhsCtx, context, true);
             List<Expression> joinExpressions = joinConditions.getSecond();
             List<Expression> hashExpressions = joinConditions.getFirst();
-            int fieldPosition = rhsProjTable.getTable().getColumns().size() - rhsProjTable.getTable().getPKColumns().size();
-            PTableWrapper projectedTable = rhsProjTable.mergeProjectedTables(lhsProjTable, type == JoinType.Inner);
-            TupleProjector.serializeProjectorIntoScan(context.getScan(), rhsProjTable.createTupleProjector());
-            context.setCurrentTable(rhsTableRef);
-            context.setResolver(projectedTable.createColumnResolver());
-            QueryPlan rhsPlan = compileSingleQuery(context, rhs, binds, asSubquery, type == JoinType.Right);
+            boolean needsMerge = lhsJoin.hasPostReference();
+            boolean needsProject = forceProjection || asSubquery || needsMerge;
+            PTable lhsTable = needsMerge ? lhsProjTable.getTable() : null;
+            int fieldPosition = needsMerge ? rhsProjTable.getTable().getColumns().size() - rhsProjTable.getTable().getPKColumns().size() : 0;
+            PTableWrapper projectedTable = needsMerge ? rhsProjTable.mergeProjectedTables(lhsProjTable, type == JoinType.Right ? JoinType.Left : type) : rhsProjTable;
+            if (needsProject) {
+                TupleProjector.serializeProjectorIntoScan(context.getScan(), rhsProjTable.createTupleProjector());
+            }
+            context.setResolver(needsProject ? projectedTable.createColumnResolver() : joinTable.getOriginalResolver());
+            QueryPlan rhsPlan = compileSingleQuery(context, rhs, binds, asSubquery, !asSubquery && type == JoinType.Right);
             Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, rhsTable);
             Integer limit = null;
             if (rhs.getLimit() != null && !rhs.isAggregate() && !rhs.isDistinct() && rhs.getOrderBy().isEmpty()) {
                 limit = LimitCompiler.compile(context, rhs);
             }
-            HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, new List[] {joinExpressions}, new JoinType[] {type == JoinType.Right ? JoinType.Left : type}, new boolean[] {true}, new PTable[] {lhsProjTable.getTable()}, new int[] {fieldPosition}, postJoinFilterExpression, limit, forceProjection);
+            HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, new List[] {joinExpressions}, new JoinType[] {type == JoinType.Right ? JoinType.Left : type}, new boolean[] {true}, new PTable[] {lhsTable}, new int[] {fieldPosition}, postJoinFilterExpression, limit, forceProjection);
             Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
-            getKeyExpressionCombinations(keyRangeExpressions, context, rhsTableRef, type, joinExpressions, hashExpressions);
+            getKeyExpressionCombinations(keyRangeExpressions, context, joinTable.getStatement(), rhsTableRef, type, joinExpressions, hashExpressions);
             return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[] {new HashSubPlan(0, lhsPlan, hashExpressions, false, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond(), lhsJoin.hasFilters())});
         }
         
-        // Do not support queries like "A right join B left join C" with hash-joins.
-        throw new SQLFeatureNotSupportedException("Joins with pattern 'A right join B left join C' not supported.");
+        JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();
+        JoinTable rhsJoin = lastJoinSpec.getJoinTable();        
+        if (type == JoinType.Right) {
+            JoinTable temp = lhsJoin;
+            lhsJoin = rhsJoin;
+            rhsJoin = temp;
+        }
+        
+        List<EqualParseNode> joinConditionNodes = lastJoinSpec.getOnConditions();
+        List<OrderByNode> lhsOrderBy = Lists.<OrderByNode> newArrayListWithExpectedSize(joinConditionNodes.size());
+        List<OrderByNode> rhsOrderBy = Lists.<OrderByNode> newArrayListWithExpectedSize(joinConditionNodes.size());
+        for (EqualParseNode condition : joinConditionNodes) {
+            lhsOrderBy.add(NODE_FACTORY.orderBy(type == JoinType.Right ? condition.getRHS() : condition.getLHS(), false, true));
+            rhsOrderBy.add(NODE_FACTORY.orderBy(type == JoinType.Right ? condition.getLHS() : condition.getRHS(), false, true));
+        }
+        
+        Scan lhsScan = ScanUtil.newScan(originalScan);
+        StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), lhsScan, new SequenceManager(statement));
+        boolean preserveRowkey = !projectPKColumns && type != JoinType.Full;
+        QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true, !preserveRowkey, lhsOrderBy);
+        PTableWrapper lhsProjTable = ((JoinedTableColumnResolver) lhsCtx.getResolver()).getPTableWrapper();
+        boolean isInRowKeyOrder = preserveRowkey && lhsPlan.getOrderBy().getOrderByExpressions().isEmpty();
+        
+        Scan rhsScan = ScanUtil.newScan(originalScan);
+        StatementContext rhsCtx = new StatementContext(statement, context.getResolver(), rhsScan, new SequenceManager(statement));
+        QueryPlan rhsPlan = compileJoinQuery(rhsCtx, binds, rhsJoin, true, true, rhsOrderBy);
+        PTableWrapper rhsProjTable = ((JoinedTableColumnResolver) rhsCtx.getResolver()).getPTableWrapper();
+        
+        Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(type == JoinType.Right ? rhsCtx : lhsCtx, type == JoinType.Right ? lhsCtx : rhsCtx, false);
+        List<Expression> lhsKeyExpressions = type == JoinType.Right ? joinConditions.getSecond() : joinConditions.getFirst();
+        List<Expression> rhsKeyExpressions = type == JoinType.Right ? joinConditions.getFirst() : joinConditions.getSecond();
+        
+        boolean needsMerge = rhsJoin.hasPostReference();
+        PTable rhsTable = needsMerge ? rhsProjTable.getTable() : null;
+        int fieldPosition = needsMerge ? lhsProjTable.getTable().getColumns().size() - lhsProjTable.getTable().getPKColumns().size() : 0;
+        PTableWrapper projectedTable = needsMerge ? lhsProjTable.mergeProjectedTables(rhsProjTable, type == JoinType.Right ? JoinType.Left : type) : lhsProjTable;
+
+        ColumnResolver resolver = projectedTable.createColumnResolver();
+        TableRef tableRef = ((JoinedTableColumnResolver) resolver).getTableRef();
+        StatementContext subCtx = new StatementContext(statement, resolver, ScanUtil.newScan(originalScan), new SequenceManager(statement));
+        subCtx.setCurrentTable(tableRef);
+        QueryPlan innerPlan = new SortMergeJoinPlan(subCtx, joinTable.getStatement(), tableRef, type == JoinType.Right ? JoinType.Left : type, lhsPlan, rhsPlan, lhsKeyExpressions, rhsKeyExpressions, projectedTable.getTable(), lhsProjTable.getTable(), rhsTable, fieldPosition);
+        context.setCurrentTable(tableRef);
+        context.setResolver(resolver);
+        TableNode from = NODE_FACTORY.namedTable(tableRef.getTableAlias(), NODE_FACTORY.table(tableRef.getTable().getSchemaName().getString(), tableRef.getTable().getTableName().getString()));
+        ParseNode where = joinTable.getPostFiltersCombined();
+        SelectStatement select = asSubquery ? NODE_FACTORY.select(from, joinTable.getStatement().getHint(), false, Collections.<AliasedNode> emptyList(), where, null, null, orderBy, null, 0, false, joinTable.getStatement().hasSequence())
+                : NODE_FACTORY.select(joinTable.getStatement(), from, where);
+        
+        return compileSingleFlatQuery(context, select, binds, asSubquery, false, innerPlan, null, isInRowKeyOrder);
     }
     
-    private boolean getKeyExpressionCombinations(Pair<Expression, Expression> combination, StatementContext context, TableRef table, JoinType type, final List<Expression> joinExpressions, final List<Expression> hashExpressions) throws SQLException {
+    private boolean getKeyExpressionCombinations(Pair<Expression, Expression> combination, StatementContext context, SelectStatement select, TableRef table, JoinType type, final List<Expression> joinExpressions, final List<Expression> hashExpressions) throws SQLException {
         if (type != JoinType.Inner && type != JoinType.Semi)
             return false;
         
@@ -313,7 +400,7 @@ public class QueryCompiler {
         StatementContext contextCopy = new StatementContext(statement, context.getResolver(), scanCopy, new SequenceManager(statement));
         contextCopy.setCurrentTable(table);
         List<Expression> lhsCombination = Lists.<Expression> newArrayList();
-        boolean complete = WhereOptimizer.getKeyExpressionCombination(lhsCombination, contextCopy, this.select, joinExpressions);
+        boolean complete = WhereOptimizer.getKeyExpressionCombination(lhsCombination, contextCopy, select, joinExpressions);
         if (lhsCombination.isEmpty())
             return false;
         
@@ -355,7 +442,7 @@ public class QueryCompiler {
     protected QueryPlan compileSingleQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter) throws SQLException{
         SelectStatement innerSelect = select.getInnerSelectStatement();
         if (innerSelect == null) {
-            return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, null, null);
+            return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, null, null, true);
         }
         
         QueryPlan innerPlan = compileSubquery(innerSelect);
@@ -369,10 +456,10 @@ public class QueryCompiler {
         tableRef = resolver.getTables().get(0);
         context.setCurrentTable(tableRef);
         
-        return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, innerPlan, innerPlan.getOrderBy().getOrderByExpressions().isEmpty() ? tupleProjector : null);
+        return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, innerPlan, tupleProjector, innerPlan.getOrderBy().getOrderByExpressions().isEmpty());
     }
     
-    protected QueryPlan compileSingleFlatQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter, QueryPlan innerPlan, TupleProjector innerPlanTupleProjector) throws SQLException{
+    protected QueryPlan compileSingleFlatQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter, QueryPlan innerPlan, TupleProjector innerPlanTupleProjector, boolean isInRowKeyOrder) throws SQLException{
         PhoenixConnection connection = statement.getConnection();
         ColumnResolver resolver = context.getResolver();
         TableRef tableRef = context.getCurrentTable();
@@ -384,7 +471,7 @@ public class QueryCompiler {
         }
         Integer limit = LimitCompiler.compile(context, select);
 
-        GroupBy groupBy = GroupByCompiler.compile(context, select, innerPlanTupleProjector);
+        GroupBy groupBy = GroupByCompiler.compile(context, select, innerPlanTupleProjector, isInRowKeyOrder);
         // Optimize the HAVING clause by finding any group by expressions that can be moved
         // to the WHERE clause
         select = HavingCompiler.rewrite(context, select, groupBy);
@@ -397,7 +484,7 @@ public class QueryCompiler {
         Set<SubqueryParseNode> subqueries = Sets.<SubqueryParseNode> newHashSet();
         Expression where = WhereCompiler.compile(context, select, viewWhere, subqueries);
         context.setResolver(resolver); // recover resolver
-        OrderBy orderBy = OrderByCompiler.compile(context, select, groupBy, limit); 
+        OrderBy orderBy = OrderByCompiler.compile(context, select, groupBy, limit, isInRowKeyOrder); 
         RowProjector projector = ProjectionCompiler.compile(context, select, groupBy, asSubquery ? Collections.<PDatum>emptyList() : targetColumns);
         
         // Final step is to build the query plan

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ebc7ee42/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java.orig
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java.orig b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java.orig
deleted file mode 100644
index b9ca813..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java.orig
+++ /dev/null
@@ -1,444 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.compile;
-
-import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
-import org.apache.phoenix.compile.JoinCompiler.JoinSpec;
-import org.apache.phoenix.compile.JoinCompiler.JoinTable;
-import org.apache.phoenix.compile.JoinCompiler.JoinedTableColumnResolver;
-import org.apache.phoenix.compile.JoinCompiler.PTableWrapper;
-import org.apache.phoenix.compile.JoinCompiler.ProjectedPTableWrapper;
-import org.apache.phoenix.compile.JoinCompiler.Table;
-import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
-import org.apache.phoenix.execute.AggregatePlan;
-import org.apache.phoenix.execute.ClientAggregatePlan;
-import org.apache.phoenix.execute.ClientScanPlan;
-import org.apache.phoenix.execute.HashJoinPlan;
-import org.apache.phoenix.execute.HashJoinPlan.HashSubPlan;
-import org.apache.phoenix.execute.HashJoinPlan.WhereClauseSubPlan;
-import org.apache.phoenix.execute.ScanPlan;
-import org.apache.phoenix.execute.TupleProjectionPlan;
-import org.apache.phoenix.execute.TupleProjector;
-import org.apache.phoenix.expression.Expression;
-import org.apache.phoenix.expression.LiteralExpression;
-import org.apache.phoenix.expression.RowValueConstructorExpression;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.join.HashJoinInfo;
-import org.apache.phoenix.parse.HintNode.Hint;
-import org.apache.phoenix.parse.JoinTableNode.JoinType;
-import org.apache.phoenix.parse.ParseNode;
-import org.apache.phoenix.parse.SQLParser;
-import org.apache.phoenix.parse.SelectStatement;
-import org.apache.phoenix.parse.SubqueryParseNode;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.schema.AmbiguousColumnException;
-import org.apache.phoenix.schema.ColumnNotFoundException;
-import org.apache.phoenix.schema.PDatum;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.util.ScanUtil;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-
-
-/**
- * 
- * Class used to build an executable query plan
- *
- * 
- * @since 0.1
- */
-public class QueryCompiler {
-    /* 
-     * Not using Scan.setLoadColumnFamiliesOnDemand(true) because we don't 
-     * want to introduce a dependency on 0.94.5 (where this feature was
-     * introduced). This will do the same thing. Once we do have a 
-     * dependency on 0.94.5 or above, switch this around.
-     */
-    private static final String LOAD_COLUMN_FAMILIES_ON_DEMAND_ATTR = "_ondemand_";
-    private final PhoenixStatement statement;
-    private final Scan scan;
-    private final Scan originalScan;
-    private final ColumnResolver resolver;
-    private final SelectStatement select;
-    private final List<? extends PDatum> targetColumns;
-    private final ParallelIteratorFactory parallelIteratorFactory;
-    private final SequenceManager sequenceManager;
-    
-    public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver) throws SQLException {
-        this(statement, select, resolver, Collections.<PDatum>emptyList(), null, new SequenceManager(statement));
-    }
-    
-    public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, SequenceManager sequenceManager) throws SQLException {
-        this.statement = statement;
-        this.select = select;
-        this.resolver = resolver;
-        this.scan = new Scan();
-        this.targetColumns = targetColumns;
-        this.parallelIteratorFactory = parallelIteratorFactory;
-        this.sequenceManager = sequenceManager;
-        if (statement.getConnection().getQueryServices().getLowestClusterHBaseVersion() >= PhoenixDatabaseMetaData.ESSENTIAL_FAMILY_VERSION_THRESHOLD) {
-            this.scan.setAttribute(LOAD_COLUMN_FAMILIES_ON_DEMAND_ATTR, QueryConstants.TRUE);
-        }
-        if (select.getHint().hasHint(Hint.NO_CACHE)) {
-            scan.setCacheBlocks(false);
-        }
-
-        this.originalScan = ScanUtil.newScan(scan);
-    }
-
-    /**
-     * Builds an executable query plan from a parsed SQL statement
-     * @return executable query plan
-     * @throws SQLException if mismatched types are found, bind value do not match binds,
-     * or invalid function arguments are encountered.
-     * @throws SQLFeatureNotSupportedException if an unsupported construct is encountered
-     * @throws TableNotFoundException if table name not found in schema
-     * @throws ColumnNotFoundException if column name could not be resolved
-     * @throws AmbiguousColumnException if an unaliased column name is ambiguous across multiple tables
-     */
-    public QueryPlan compile() throws SQLException{
-        SelectStatement select = this.select;
-        List<Object> binds = statement.getParameters();
-        StatementContext context = new StatementContext(statement, resolver, scan, sequenceManager);
-        if (select.isJoin()) {
-            select = JoinCompiler.optimize(statement, select, resolver);
-            if (this.select != select) {
-                ColumnResolver resolver = FromCompiler.getResolverForQuery(select, statement.getConnection());
-                context = new StatementContext(statement, resolver, scan, sequenceManager);
-            }
-            JoinTable joinTable = JoinCompiler.compile(statement, select, context.getResolver());
-            return compileJoinQuery(context, binds, joinTable, false);
-        } else {
-            return compileSingleQuery(context, select, binds, false, true);
-        }
-    }
-    
-    @SuppressWarnings("unchecked")
-    protected QueryPlan compileJoinQuery(StatementContext context, List<Object> binds, JoinTable joinTable, boolean asSubquery) throws SQLException {
-        byte[] emptyByteArray = new byte[0];
-        List<JoinSpec> joinSpecs = joinTable.getJoinSpecs();
-        if (joinSpecs.isEmpty()) {
-            Table table = joinTable.getTable();
-            SelectStatement subquery = table.getAsSubquery();
-            if (!table.isSubselect()) {
-                ProjectedPTableWrapper projectedTable = table.createProjectedTable(!asSubquery);
-                TupleProjector.serializeProjectorIntoScan(context.getScan(), projectedTable.createTupleProjector());
-                context.setCurrentTable(table.getTableRef());
-                context.setResolver(projectedTable.createColumnResolver());
-                table.projectColumns(context.getScan());
-                return compileSingleQuery(context, subquery, binds, asSubquery, true);
-            }
-            QueryPlan plan = compileSubquery(subquery);
-            ProjectedPTableWrapper projectedTable = table.createProjectedTable(plan.getProjector());
-            context.setResolver(projectedTable.createColumnResolver());
-            return new TupleProjectionPlan(plan, projectedTable.createTupleProjector(), table.compilePostFilterExpression(context));
-        }
-        
-        boolean[] starJoinVector = joinTable.getStarJoinVector();
-        if (starJoinVector != null) {
-            Table table = joinTable.getTable();
-            ProjectedPTableWrapper initialProjectedTable;
-            TableRef tableRef;
-            SelectStatement query;
-            if (!table.isSubselect()) {
-                initialProjectedTable = table.createProjectedTable(!asSubquery);
-                tableRef = table.getTableRef();
-                table.projectColumns(context.getScan());
-                query = joinTable.getAsSingleSubquery(table.getAsSubquery(), asSubquery);
-            } else {
-                SelectStatement subquery = table.getAsSubquery();
-                QueryPlan plan = compileSubquery(subquery);
-                initialProjectedTable = table.createProjectedTable(plan.getProjector());
-                tableRef = plan.getTableRef();
-                context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap());
-                query = joinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery);
-            }
-            PTableWrapper projectedTable = initialProjectedTable;
-            int count = joinSpecs.size();
-            ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[count];
-            List<Expression>[] joinExpressions = new List[count];
-            JoinType[] joinTypes = new JoinType[count];
-            PTable[] tables = new PTable[count];
-            int[] fieldPositions = new int[count];
-            HashSubPlan[] subPlans = new HashSubPlan[count];
-            fieldPositions[0] = projectedTable.getTable().getColumns().size() - projectedTable.getTable().getPKColumns().size();
-            boolean forceProjection = table.isSubselect();
-            boolean needsProject = forceProjection || asSubquery;
-            for (int i = 0; i < count; i++) {
-                JoinSpec joinSpec = joinSpecs.get(i);
-                Scan subScan = ScanUtil.newScan(originalScan);
-                StatementContext subContext = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
-                QueryPlan joinPlan = compileJoinQuery(subContext, binds, joinSpec.getJoinTable(), true);
-                ColumnResolver resolver = subContext.getResolver();
-                boolean hasPostReference = joinSpec.getJoinTable().hasPostReference();
-                if (hasPostReference) {
-                    PTableWrapper subProjTable = ((JoinedTableColumnResolver) (resolver)).getPTableWrapper();
-                    tables[i] = subProjTable.getTable();
-                    projectedTable = projectedTable.mergeProjectedTables(subProjTable, joinSpec.getType() == JoinType.Inner);
-                    needsProject = true;
-                } else {
-                    tables[i] = null;
-                }
-                if (!starJoinVector[i]) {
-                    needsProject = true;
-                }
-                ColumnResolver leftResolver = (!forceProjection && starJoinVector[i]) ? joinTable.getOriginalResolver() : projectedTable.createColumnResolver();
-                joinIds[i] = new ImmutableBytesPtr(emptyByteArray); // place-holder
-                Pair<List<Expression>, List<Expression>> joinConditions = joinSpec.compileJoinConditions(context, leftResolver, resolver);
-                joinExpressions[i] = joinConditions.getFirst();
-                List<Expression> hashExpressions = joinConditions.getSecond();
-                Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
-                boolean complete = getKeyExpressionCombinations(keyRangeExpressions, context, tableRef, joinSpec.getType(), joinExpressions[i], hashExpressions);
-                Expression keyRangeLhsExpression = keyRangeExpressions.getFirst();
-                Expression keyRangeRhsExpression = keyRangeExpressions.getSecond();
-                boolean hasFilters = joinSpec.getJoinTable().hasFilters();
-                boolean optimized = complete && hasFilters;
-                joinTypes[i] = joinSpec.getType();
-                if (i < count - 1) {
-                    fieldPositions[i + 1] = fieldPositions[i] + (tables[i] == null ? 0 : (tables[i].getColumns().size() - tables[i].getPKColumns().size()));
-                }
-                subPlans[i] = new HashSubPlan(i, joinPlan, optimized ? null : hashExpressions, joinSpec.isSingleValueOnly(), keyRangeLhsExpression, keyRangeRhsExpression, hasFilters);
-            }
-            if (needsProject) {
-                TupleProjector.serializeProjectorIntoScan(context.getScan(), initialProjectedTable.createTupleProjector());
-            }
-            context.setCurrentTable(tableRef);
-            context.setResolver(needsProject ? projectedTable.createColumnResolver() : joinTable.getOriginalResolver());
-            QueryPlan plan = compileSingleQuery(context, query, binds, asSubquery, joinTable.isAllLeftJoin());
-            Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, table);
-            Integer limit = null;
-            if (query.getLimit() != null && !query.isAggregate() && !query.isDistinct() && query.getOrderBy().isEmpty()) {
-                limit = LimitCompiler.compile(context, query);
-            }
-            HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, joinExpressions, joinTypes, starJoinVector, tables, fieldPositions, postJoinFilterExpression, limit, forceProjection);
-            return HashJoinPlan.create(joinTable.getStatement(), plan, joinInfo, subPlans);
-        }
-        
-        JoinSpec lastJoinSpec = joinSpecs.get(joinSpecs.size() - 1);
-        JoinType type = lastJoinSpec.getType();
-        if (type == JoinType.Full)
-            throw new SQLFeatureNotSupportedException(type + " joins not supported.");
-        
-        if (type == JoinType.Right || type == JoinType.Inner) {
-            if (!lastJoinSpec.getJoinTable().getJoinSpecs().isEmpty())
-                throw new SQLFeatureNotSupportedException("Right join followed by sub-join is not supported.");
-            
-            JoinTable rhsJoinTable = lastJoinSpec.getJoinTable();
-            Table rhsTable = rhsJoinTable.getTable();
-            JoinTable lhsJoin = joinTable.getSubJoinTableWithoutPostFilters();
-            Scan subScan = ScanUtil.newScan(originalScan);
-            StatementContext lhsCtx = new StatementContext(statement, context.getResolver(), subScan, new SequenceManager(statement));
-            QueryPlan lhsPlan = compileJoinQuery(lhsCtx, binds, lhsJoin, true);
-            ColumnResolver lhsResolver = lhsCtx.getResolver();
-            PTableWrapper lhsProjTable = ((JoinedTableColumnResolver) (lhsResolver)).getPTableWrapper();
-            ProjectedPTableWrapper rhsProjTable;
-            TableRef rhsTableRef;
-            SelectStatement rhs;
-            if (!rhsTable.isSubselect()) {
-                rhsProjTable = rhsTable.createProjectedTable(!asSubquery);
-                rhsTableRef = rhsTable.getTableRef();
-                rhsTable.projectColumns(context.getScan());
-                rhs = rhsJoinTable.getAsSingleSubquery(rhsTable.getAsSubquery(), asSubquery);
-            } else {
-                SelectStatement subquery = rhsTable.getAsSubquery();
-                QueryPlan plan = compileSubquery(subquery);
-                rhsProjTable = rhsTable.createProjectedTable(plan.getProjector());
-                rhsTableRef = plan.getTableRef();
-                context.getScan().setFamilyMap(plan.getContext().getScan().getFamilyMap());
-                rhs = rhsJoinTable.getAsSingleSubquery((SelectStatement) plan.getStatement(), asSubquery);
-            }
-            boolean forceProjection = rhsTable.isSubselect();
-            ColumnResolver rhsResolver = forceProjection ? rhsProjTable.createColumnResolver() : joinTable.getOriginalResolver();
-            ImmutableBytesPtr[] joinIds = new ImmutableBytesPtr[] {new ImmutableBytesPtr(emptyByteArray)};
-            Pair<List<Expression>, List<Expression>> joinConditions = lastJoinSpec.compileJoinConditions(context, lhsResolver, rhsResolver);
-            List<Expression> joinExpressions = joinConditions.getSecond();
-            List<Expression> hashExpressions = joinConditions.getFirst();
-            int fieldPosition = rhsProjTable.getTable().getColumns().size() - rhsProjTable.getTable().getPKColumns().size();
-            PTableWrapper projectedTable = rhsProjTable.mergeProjectedTables(lhsProjTable, type == JoinType.Inner);
-            TupleProjector.serializeProjectorIntoScan(context.getScan(), rhsProjTable.createTupleProjector());
-            context.setCurrentTable(rhsTableRef);
-            context.setResolver(projectedTable.createColumnResolver());
-            QueryPlan rhsPlan = compileSingleQuery(context, rhs, binds, asSubquery, type == JoinType.Right);
-            Expression postJoinFilterExpression = joinTable.compilePostFilterExpression(context, rhsTable);
-            Integer limit = null;
-            if (rhs.getLimit() != null && !rhs.isAggregate() && !rhs.isDistinct() && rhs.getOrderBy().isEmpty()) {
-                limit = LimitCompiler.compile(context, rhs);
-            }
-            HashJoinInfo joinInfo = new HashJoinInfo(projectedTable.getTable(), joinIds, new List[] {joinExpressions}, new JoinType[] {type == JoinType.Right ? JoinType.Left : type}, new boolean[] {true}, new PTable[] {lhsProjTable.getTable()}, new int[] {fieldPosition}, postJoinFilterExpression, limit, forceProjection);
-            Pair<Expression, Expression> keyRangeExpressions = new Pair<Expression, Expression>(null, null);
-            getKeyExpressionCombinations(keyRangeExpressions, context, rhsTableRef, type, joinExpressions, hashExpressions);
-            return HashJoinPlan.create(joinTable.getStatement(), rhsPlan, joinInfo, new HashSubPlan[] {new HashSubPlan(0, lhsPlan, hashExpressions, false, keyRangeExpressions.getFirst(), keyRangeExpressions.getSecond(), lhsJoin.hasFilters())});
-        }
-        
-        // Do not support queries like "A right join B left join C" with hash-joins.
-        throw new SQLFeatureNotSupportedException("Joins with pattern 'A right join B left join C' not supported.");
-    }
-    
-    private boolean getKeyExpressionCombinations(Pair<Expression, Expression> combination, StatementContext context, TableRef table, JoinType type, final List<Expression> joinExpressions, final List<Expression> hashExpressions) throws SQLException {
-        if (type != JoinType.Inner && type != JoinType.Semi)
-            return false;
-        
-        Scan scanCopy = ScanUtil.newScan(context.getScan());
-        StatementContext contextCopy = new StatementContext(statement, context.getResolver(), scanCopy, new SequenceManager(statement));
-        contextCopy.setCurrentTable(table);
-        List<Expression> lhsCombination = Lists.<Expression> newArrayList();
-        boolean complete = WhereOptimizer.getKeyExpressionCombination(lhsCombination, contextCopy, this.select, joinExpressions);
-        if (lhsCombination.isEmpty())
-            return false;
-        
-        List<Expression> rhsCombination = Lists.newArrayListWithExpectedSize(lhsCombination.size());
-        for (int i = 0; i < lhsCombination.size(); i++) {
-            Expression lhs = lhsCombination.get(i);
-            for (int j = 0; j < joinExpressions.size(); j++) {
-                if (lhs == joinExpressions.get(j)) {
-                    rhsCombination.add(hashExpressions.get(j));
-                    break;
-                }
-            }
-        }
-        
-        if (lhsCombination.size() == 1) {
-            combination.setFirst(lhsCombination.get(0));
-            combination.setSecond(rhsCombination.get(0));
-        } else {
-            combination.setFirst(new RowValueConstructorExpression(lhsCombination, false));
-            combination.setSecond(new RowValueConstructorExpression(rhsCombination, false));
-        }
-        
-        return type == JoinType.Semi && complete;
-    }
-    
-    protected QueryPlan compileSubquery(SelectStatement subquery) throws SQLException {
-        subquery = SubselectRewriter.flatten(subquery, this.statement.getConnection());
-        ColumnResolver resolver = FromCompiler.getResolverForQuery(subquery, this.statement.getConnection());
-        subquery = StatementNormalizer.normalize(subquery, resolver);
-        SelectStatement transformedSubquery = SubqueryRewriter.transform(subquery, resolver, this.statement.getConnection());
-        if (transformedSubquery != subquery) {
-            resolver = FromCompiler.getResolverForQuery(transformedSubquery, this.statement.getConnection());
-            subquery = StatementNormalizer.normalize(transformedSubquery, resolver);
-        }
-        QueryPlan plan = new QueryCompiler(this.statement, subquery, resolver).compile();
-        return statement.getConnection().getQueryServices().getOptimizer().optimize(statement, plan);
-    }
-    
-    protected QueryPlan compileSingleQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter) throws SQLException{
-        SelectStatement innerSelect = select.getInnerSelectStatement();
-        if (innerSelect == null) {
-            return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, null, null);
-        }
-        
-        QueryPlan innerPlan = compileSubquery(innerSelect);
-        TupleProjector tupleProjector = new TupleProjector(innerPlan.getProjector());
-        innerPlan = new TupleProjectionPlan(innerPlan, tupleProjector, null);
-        
-        // Replace the original resolver and table with those having compiled type info.
-        TableRef tableRef = context.getResolver().getTables().get(0);        
-        ColumnResolver resolver = FromCompiler.getResolverForCompiledDerivedTable(statement.getConnection(), tableRef, innerPlan.getProjector());
-        context.setResolver(resolver);
-        tableRef = resolver.getTables().get(0);
-        context.setCurrentTable(tableRef);
-        
-        return compileSingleFlatQuery(context, select, binds, asSubquery, allowPageFilter, innerPlan, innerPlan.getOrderBy().getOrderByExpressions().isEmpty() ? tupleProjector : null);
-    }
-    
-    protected QueryPlan compileSingleFlatQuery(StatementContext context, SelectStatement select, List<Object> binds, boolean asSubquery, boolean allowPageFilter, QueryPlan innerPlan, TupleProjector innerPlanTupleProjector) throws SQLException{
-        PhoenixConnection connection = statement.getConnection();
-        ColumnResolver resolver = context.getResolver();
-        TableRef tableRef = context.getCurrentTable();
-        PTable table = tableRef.getTable();
-        
-        ParseNode viewWhere = null;
-        if (table.getViewStatement() != null) {
-            viewWhere = new SQLParser(table.getViewStatement()).parseQuery().getWhere();
-        }
-        Integer limit = LimitCompiler.compile(context, select);
-
-        GroupBy groupBy = GroupByCompiler.compile(context, select, innerPlanTupleProjector);
-        // Optimize the HAVING clause by finding any group by expressions that can be moved
-        // to the WHERE clause
-        select = HavingCompiler.rewrite(context, select, groupBy);
-        Expression having = HavingCompiler.compile(context, select, groupBy);
-        // Don't pass groupBy when building where clause expression, because we do not want to wrap these
-        // expressions as group by key expressions since they're pre, not post filtered.
-        if (innerPlan == null) {
-            context.setResolver(FromCompiler.getResolverForQuery(select, connection));
-        }
-        Set<SubqueryParseNode> subqueries = Sets.<SubqueryParseNode> newHashSet();
-        Expression where = WhereCompiler.compile(context, select, viewWhere, subqueries);
-        context.setResolver(resolver); // recover resolver
-        OrderBy orderBy = OrderByCompiler.compile(context, select, groupBy, limit); 
-        RowProjector projector = ProjectionCompiler.compile(context, select, groupBy, asSubquery ? Collections.<PDatum>emptyList() : targetColumns);
-        
-        // Final step is to build the query plan
-        int maxRows = statement.getMaxRows();
-        if (maxRows > 0) {
-            if (limit != null) {
-                limit = Math.min(limit, maxRows);
-            } else {
-                limit = maxRows;
-            }
-        }
-        
-        QueryPlan plan = innerPlan;
-        if (plan == null) {
-            ParallelIteratorFactory parallelIteratorFactory = asSubquery ? null : this.parallelIteratorFactory;
-            plan = select.isAggregate() || select.isDistinct() ? 
-                      new AggregatePlan(context, select, tableRef, projector, limit, orderBy, parallelIteratorFactory, groupBy, having)
-                    : new ScanPlan(context, select, tableRef, projector, limit, orderBy, parallelIteratorFactory, allowPageFilter);
-        }
-        if (!subqueries.isEmpty()) {
-            int count = subqueries.size();
-            WhereClauseSubPlan[] subPlans = new WhereClauseSubPlan[count];
-            int i = 0;
-            for (SubqueryParseNode subqueryNode : subqueries) {
-                SelectStatement stmt = subqueryNode.getSelectNode();
-                subPlans[i++] = new WhereClauseSubPlan(compileSubquery(stmt), stmt, subqueryNode.expectSingleRow());
-            }
-            plan = HashJoinPlan.create(select, plan, null, subPlans);
-        }
-        
-        if (innerPlan != null) {
-            if (LiteralExpression.isTrue(where)) {
-                where = null; // we do not pass "true" as filter
-            }
-            plan =  select.isAggregate() || select.isDistinct() ?
-                      new ClientAggregatePlan(context, select, tableRef, projector, limit, where, orderBy, groupBy, having, plan)
-                    : new ClientScanPlan(context, select, tableRef, projector, limit, where, orderBy, plan);
-
-        }
-        
-        return plan;
-    }
-}
-
-

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ebc7ee42/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
index d229478..805894f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SubselectRewriter.java
@@ -58,6 +58,13 @@ public class SubselectRewriter extends ParseNodeRewriter {
         return statement.getLimit() == null && (!statement.isAggregate() || !statement.getGroupBy().isEmpty());        
     }
     
+    public static SelectStatement applyOrderBy(SelectStatement statement, List<OrderByNode> orderBy, String subqueryAlias) throws SQLException {
+        if (orderBy == null)
+            return statement;
+        
+        return new SubselectRewriter(null, statement.getSelect(), subqueryAlias).applyOrderBy(statement, orderBy);
+    }
+    
     public static SelectStatement flatten(SelectStatement select, PhoenixConnection connection) throws SQLException {
         TableNode from = select.getFrom();
         while (from != null && from instanceof DerivedTableNode) {
@@ -209,16 +216,24 @@ public class SubselectRewriter extends ParseNodeRewriter {
             if (where != null) {
                 postFiltersRewrite.add(where);
             }
-            return NODE_FACTORY.select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(), combine(postFiltersRewrite), statement.getGroupBy(), statement.getHaving(), statement.getOrderBy(), statement.getLimit(),
-                    statement.getBindCount(), statement.isAggregate(), statement.hasSequence());
+            return NODE_FACTORY.select(statement, combine(postFiltersRewrite));
         }
         
         ParseNode having = statement.getHaving();
         if (having != null) {
             postFiltersRewrite.add(having);
         }
-        return NODE_FACTORY.select(statement.getFrom(), statement.getHint(), statement.isDistinct(), statement.getSelect(), statement.getWhere(), statement.getGroupBy(), combine(postFiltersRewrite), statement.getOrderBy(), statement.getLimit(),
-                statement.getBindCount(), statement.isAggregate(), statement.hasSequence());
+        return NODE_FACTORY.select(statement, statement.getWhere(), combine(postFiltersRewrite));
+    }
+    
+    private SelectStatement applyOrderBy(SelectStatement statement, List<OrderByNode> orderBy) throws SQLException {
+        List<OrderByNode> orderByRewrite = Lists.<OrderByNode> newArrayListWithExpectedSize(orderBy.size());
+        for (OrderByNode orderByNode : orderBy) {
+            ParseNode node = orderByNode.getNode();
+            orderByRewrite.add(NODE_FACTORY.orderBy(node.accept(this), orderByNode.isNullsLast(), orderByNode.isAscending()));
+        }
+        
+        return NODE_FACTORY.select(statement, orderByRewrite);
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ebc7ee42/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 0be40b8..796f368 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -419,6 +419,11 @@ public class UpsertCompiler {
                     try {
                         QueryCompiler compiler = new QueryCompiler(statement, select, selectResolver, targetColumns, parallelIteratorFactoryToBe, new SequenceManager(statement));
                         queryPlanToBe = compiler.compile();
+                        // This is post-fix: if the tableRef is a projected table, this means there are post-processing 
+                        // steps and parallelIteratorFactory did not take effect.
+                        if (queryPlanToBe.getTableRef().getTable().getType() == PTableType.JOIN || queryPlanToBe.getTableRef().getTable().getType() == PTableType.SUBQUERY) {
+                            parallelIteratorFactoryToBe = null;
+                        }
                     } catch (MetaDataEntityNotFoundException e) {
                         retryOnce = false; // don't retry if select clause has meta data entities that aren't found, as we already updated the cache
                         throw e;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ebc7ee42/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
index 2c49fed..471ee37 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java
@@ -140,7 +140,9 @@ public class WhereCompiler {
             expression = AndExpression.create(filters);
         }
         
-        expression = WhereOptimizer.pushKeyExpressionsToScan(context, statement, expression, extractedNodes);
+        if (context.getCurrentTable().getTable().getType() != PTableType.JOIN && context.getCurrentTable().getTable().getType() != PTableType.SUBQUERY) {
+            expression = WhereOptimizer.pushKeyExpressionsToScan(context, statement, expression, extractedNodes);
+        }
         setScanFilter(context, statement, expression, whereCompiler.disambiguateWithFamily, hashJoinOptimization);
 
         return expression;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ebc7ee42/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index 724122d..176520e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -154,7 +154,7 @@ public class HashJoinRegionScanner implements RegionScanner {
                 for (int i = 0; i < count; i++) {
                     boolean earlyEvaluation = joinInfo.earlyEvaluation()[i];
                     JoinType type = joinInfo.getJoinTypes()[i];
-                    if (earlyEvaluation && (tempTuples[i] == null || type == JoinType.Semi))
+                    if (earlyEvaluation && (type == JoinType.Semi || type == JoinType.Anti))
                         continue;
                     int j = resultQueue.size();
                     while (j-- > 0) {
@@ -163,12 +163,23 @@ public class HashJoinRegionScanner implements RegionScanner {
                             ImmutableBytesPtr key = TupleUtil.getConcatenatedValue(lhs, joinInfo.getJoinExpressions()[i]);
                             tempTuples[i] = hashCaches[i].get(key);                        	
                             if (tempTuples[i] == null) {
-                                if (type != JoinType.Inner && type != JoinType.Semi) {
+                                if (type == JoinType.Inner || type == JoinType.Semi) {
+                                    continue;
+                                } else if (type == JoinType.Anti) {
                                     resultQueue.offer(lhs);
+                                    continue;
                                 }
-                                continue;
                             }
                         }
+                        if (tempTuples[i] == null) {
+                            Tuple joined = tempSrcBitSet[i] == ValueBitSet.EMPTY_VALUE_BITSET ?
+                                    lhs : TupleProjector.mergeProjectedValue(
+                                            (ProjectedValueTuple) lhs, schema, tempDestBitSet,
+                                            null, joinInfo.getSchemas()[i], tempSrcBitSet[i], 
+                                            joinInfo.getFieldPositions()[i]);
+                            resultQueue.offer(joined);
+                            continue;
+                        }
                         for (Tuple t : tempTuples[i]) {
                             Tuple joined = tempSrcBitSet[i] == ValueBitSet.EMPTY_VALUE_BITSET ?
                                     lhs : TupleProjector.mergeProjectedValue(


[3/4] phoenix git commit: PHOENIX-1799 Support many-to-many joins

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ebc7ee42/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java
new file mode 100644
index 0000000..469388b
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SortMergeJoinIT.java
@@ -0,0 +1,2822 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.JOIN_CUSTOMER_TABLE_FULL_NAME;
+import static org.apache.phoenix.util.TestUtil.JOIN_ITEM_TABLE_DISPLAY_NAME;
+import static org.apache.phoenix.util.TestUtil.JOIN_ITEM_TABLE_FULL_NAME;
+import static org.apache.phoenix.util.TestUtil.JOIN_ORDER_TABLE_DISPLAY_NAME;
+import static org.apache.phoenix.util.TestUtil.JOIN_ORDER_TABLE_FULL_NAME;
+import static org.apache.phoenix.util.TestUtil.JOIN_SCHEMA;
+import static org.apache.phoenix.util.TestUtil.JOIN_SUPPLIER_TABLE_DISPLAY_NAME;
+import static org.apache.phoenix.util.TestUtil.JOIN_SUPPLIER_TABLE_FULL_NAME;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.TableAlreadyExistsException;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+@Category(HBaseManagedTimeTest.class)
+@RunWith(Parameterized.class)
+public class SortMergeJoinIT extends BaseHBaseManagedTimeIT {
+    
+    private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    private String[] indexDDL;
+    private String[] plans;
+    
+    public SortMergeJoinIT(String[] indexDDL, String[] plans) {
+        this.indexDDL = indexDDL;
+        this.plans = plans;
+    }
+    
+    @BeforeClass
+    @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+        // Forces server cache to be used
+        props.put(QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, Integer.toString(2));
+        // Must update config before starting server
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+    
+    @Before
+    public void initTable() throws Exception {
+        initTableValues();
+        if (indexDDL != null && indexDDL.length > 0) {
+            Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+            Connection conn = DriverManager.getConnection(getUrl(), props);
+            for (String ddl : indexDDL) {
+                try {
+                    conn.createStatement().execute(ddl);
+                } catch (TableAlreadyExistsException e) {
+                }
+            }
+            conn.close();
+        }
+    }
+    
+    @Parameters
+    public static Collection<Object> data() {
+        List<Object> testCases = Lists.newArrayList();
+        testCases.add(new String[][] {
+                {}, {
+                "SORT-MERGE-JOIN (LEFT) TABLES\n" +
+                "    CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + "\n" +
+                "AND\n" +
+                "    SORT-MERGE-JOIN (INNER) TABLES\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ITEM_TABLE_DISPLAY_NAME + "\n" +
+                "            SERVER SORTED BY [I.item_id]\n" +
+                "        CLIENT MERGE SORT\n" +
+                "    AND (SKIP MERGE)\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "            SERVER FILTER BY QUANTITY < 5000\n" +
+                "            SERVER SORTED BY [O.item_id]\n" +
+                "        CLIENT MERGE SORT\n" +
+                "    CLIENT SORTED BY [I.supplier_id]",
+                }});
+        testCases.add(new String[][] {
+                {
+                "CREATE INDEX \"idx_customer\" ON " + JOIN_CUSTOMER_TABLE_FULL_NAME + " (name)",
+                "CREATE INDEX \"idx_item\" ON " + JOIN_ITEM_TABLE_FULL_NAME + " (name) INCLUDE (price, discount1, discount2, \"supplier_id\", description)",
+                "CREATE INDEX \"idx_supplier\" ON " + JOIN_SUPPLIER_TABLE_FULL_NAME + " (name)"
+                }, {
+                "SORT-MERGE-JOIN (LEFT) TABLES\n" +
+                "    CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SCHEMA + ".idx_supplier\n" +
+                "        SERVER SORTED BY [S.:supplier_id]\n" +
+                "    CLIENT MERGE SORT\n" +
+                "AND\n" +
+                "    SORT-MERGE-JOIN (INNER) TABLES\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_SCHEMA + ".idx_item\n" +
+                "            SERVER SORTED BY [I.:item_id]\n" +
+                "        CLIENT MERGE SORT\n" +
+                "    AND (SKIP MERGE)\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "            SERVER FILTER BY QUANTITY < 5000\n" +
+                "            SERVER SORTED BY [O.item_id]\n" +
+                "        CLIENT MERGE SORT\n" +
+                "    CLIENT SORTED BY [I.0:supplier_id]"
+                }});
+        testCases.add(new String[][] {
+                {
+                "CREATE LOCAL INDEX \"idx_customer\" ON " + JOIN_CUSTOMER_TABLE_FULL_NAME + " (name)",
+                "CREATE LOCAL INDEX \"idx_item\" ON " + JOIN_ITEM_TABLE_FULL_NAME + " (name) INCLUDE (price, discount1, discount2, \"supplier_id\", description)",
+                "CREATE LOCAL INDEX \"idx_supplier\" ON " + JOIN_SUPPLIER_TABLE_FULL_NAME + " (name)"
+                }, {
+                "SORT-MERGE-JOIN (LEFT) TABLES\n" +
+                "    CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX + JOIN_SUPPLIER_TABLE_DISPLAY_NAME + " [-32768]\n" +
+                "        SERVER SORTED BY [S.:supplier_id]\n" +
+                "    CLIENT MERGE SORT\n" +
+                "AND\n" +
+                "    SORT-MERGE-JOIN (INNER) TABLES\n" +
+                "        CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX + JOIN_ITEM_TABLE_DISPLAY_NAME + " [-32768]\n" +
+                "            SERVER SORTED BY [I.:item_id]\n" +
+                "        CLIENT MERGE SORT\n" +
+                "    AND (SKIP MERGE)\n" +
+                "        CLIENT PARALLEL 1-WAY FULL SCAN OVER " + JOIN_ORDER_TABLE_DISPLAY_NAME + "\n" +
+                "            SERVER FILTER BY QUANTITY < 5000\n" +
+                "            SERVER SORTED BY [O.item_id]\n" +
+                "        CLIENT MERGE SORT\n" +
+                "    CLIENT SORTED BY [I.0:supplier_id]"
+                }});
+        return testCases;
+    }
+    
+    
+    protected void initTableValues() throws Exception {
+        ensureTableCreated(getUrl(), JOIN_CUSTOMER_TABLE_FULL_NAME);
+        ensureTableCreated(getUrl(), JOIN_ITEM_TABLE_FULL_NAME);
+        ensureTableCreated(getUrl(), JOIN_SUPPLIER_TABLE_FULL_NAME);
+        ensureTableCreated(getUrl(), JOIN_ORDER_TABLE_FULL_NAME);
+        
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            conn.createStatement().execute("CREATE SEQUENCE my.seq");
+            // Insert into customer table
+            PreparedStatement stmt = conn.prepareStatement(
+                    "upsert into " + JOIN_CUSTOMER_TABLE_FULL_NAME +
+                    "   (\"customer_id\", " +
+                    "    NAME, " +
+                    "    PHONE, " +
+                    "    ADDRESS, " +
+                    "    LOC_ID, " +
+                    "    DATE) " +
+                    "values (?, ?, ?, ?, ?, ?)");
+            stmt.setString(1, "0000000001");
+            stmt.setString(2, "C1");
+            stmt.setString(3, "999-999-1111");
+            stmt.setString(4, "101 XXX Street");
+            stmt.setString(5, "10001");
+            stmt.setDate(6, new Date(format.parse("2013-11-01 10:20:36").getTime()));
+            stmt.execute();
+                
+            stmt.setString(1, "0000000002");
+            stmt.setString(2, "C2");
+            stmt.setString(3, "999-999-2222");
+            stmt.setString(4, "202 XXX Street");
+            stmt.setString(5, null);
+            stmt.setDate(6, new Date(format.parse("2013-11-25 16:45:07").getTime()));
+            stmt.execute();
+
+            stmt.setString(1, "0000000003");
+            stmt.setString(2, "C3");
+            stmt.setString(3, "999-999-3333");
+            stmt.setString(4, "303 XXX Street");
+            stmt.setString(5, null);
+            stmt.setDate(6, new Date(format.parse("2013-11-25 10:06:29").getTime()));
+            stmt.execute();
+
+            stmt.setString(1, "0000000004");
+            stmt.setString(2, "C4");
+            stmt.setString(3, "999-999-4444");
+            stmt.setString(4, "404 XXX Street");
+            stmt.setString(5, "10004");
+            stmt.setDate(6, new Date(format.parse("2013-11-22 14:22:56").getTime()));
+            stmt.execute();
+
+            stmt.setString(1, "0000000005");
+            stmt.setString(2, "C5");
+            stmt.setString(3, "999-999-5555");
+            stmt.setString(4, "505 XXX Street");
+            stmt.setString(5, "10005");
+            stmt.setDate(6, new Date(format.parse("2013-11-27 09:37:50").getTime()));
+            stmt.execute();
+
+            stmt.setString(1, "0000000006");
+            stmt.setString(2, "C6");
+            stmt.setString(3, "999-999-6666");
+            stmt.setString(4, "606 XXX Street");
+            stmt.setString(5, "10001");
+            stmt.setDate(6, new Date(format.parse("2013-11-01 10:20:36").getTime()));
+            stmt.execute();
+            
+            // Insert into item table
+            stmt = conn.prepareStatement(
+                    "upsert into " + JOIN_ITEM_TABLE_FULL_NAME +
+                    "   (\"item_id\", " +
+                    "    NAME, " +
+                    "    PRICE, " +
+                    "    DISCOUNT1, " +
+                    "    DISCOUNT2, " +
+                    "    \"supplier_id\", " +
+                    "    DESCRIPTION) " +
+                    "values (?, ?, ?, ?, ?, ?, ?)");
+            stmt.setString(1, "0000000001");
+            stmt.setString(2, "T1");
+            stmt.setInt(3, 100);
+            stmt.setInt(4, 5);
+            stmt.setInt(5, 10);
+            stmt.setString(6, "0000000001");
+            stmt.setString(7, "Item T1");
+            stmt.execute();
+
+            stmt.setString(1, "0000000002");
+            stmt.setString(2, "T2");
+            stmt.setInt(3, 200);
+            stmt.setInt(4, 5);
+            stmt.setInt(5, 8);
+            stmt.setString(6, "0000000001");
+            stmt.setString(7, "Item T2");
+            stmt.execute();
+
+            stmt.setString(1, "0000000003");
+            stmt.setString(2, "T3");
+            stmt.setInt(3, 300);
+            stmt.setInt(4, 8);
+            stmt.setInt(5, 12);
+            stmt.setString(6, "0000000002");
+            stmt.setString(7, "Item T3");
+            stmt.execute();
+
+            stmt.setString(1, "0000000004");
+            stmt.setString(2, "T4");
+            stmt.setInt(3, 400);
+            stmt.setInt(4, 6);
+            stmt.setInt(5, 10);
+            stmt.setString(6, "0000000002");
+            stmt.setString(7, "Item T4");
+            stmt.execute();
+
+            stmt.setString(1, "0000000005");
+            stmt.setString(2, "T5");
+            stmt.setInt(3, 500);
+            stmt.setInt(4, 8);
+            stmt.setInt(5, 15);
+            stmt.setString(6, "0000000005");
+            stmt.setString(7, "Item T5");
+            stmt.execute();
+
+            stmt.setString(1, "0000000006");
+            stmt.setString(2, "T6");
+            stmt.setInt(3, 600);
+            stmt.setInt(4, 8);
+            stmt.setInt(5, 15);
+            stmt.setString(6, "0000000006");
+            stmt.setString(7, "Item T6");
+            stmt.execute();
+            
+            stmt.setString(1, "invalid001");
+            stmt.setString(2, "INVALID-1");
+            stmt.setInt(3, 0);
+            stmt.setInt(4, 0);
+            stmt.setInt(5, 0);
+            stmt.setString(6, "0000000000");
+            stmt.setString(7, "Invalid item for join test");
+            stmt.execute();
+
+            // Insert into supplier table
+            stmt = conn.prepareStatement(
+                    "upsert into " + JOIN_SUPPLIER_TABLE_FULL_NAME +
+                    "   (\"supplier_id\", " +
+                    "    NAME, " +
+                    "    PHONE, " +
+                    "    ADDRESS, " +
+                    "    LOC_ID) " +
+                    "values (?, ?, ?, ?, ?)");
+            stmt.setString(1, "0000000001");
+            stmt.setString(2, "S1");
+            stmt.setString(3, "888-888-1111");
+            stmt.setString(4, "101 YYY Street");
+            stmt.setString(5, "10001");
+            stmt.execute();
+                
+            stmt.setString(1, "0000000002");
+            stmt.setString(2, "S2");
+            stmt.setString(3, "888-888-2222");
+            stmt.setString(4, "202 YYY Street");
+            stmt.setString(5, "10002");
+            stmt.execute();
+
+            stmt.setString(1, "0000000003");
+            stmt.setString(2, "S3");
+            stmt.setString(3, "888-888-3333");
+            stmt.setString(4, "303 YYY Street");
+            stmt.setString(5, null);
+            stmt.execute();
+
+            stmt.setString(1, "0000000004");
+            stmt.setString(2, "S4");
+            stmt.setString(3, "888-888-4444");
+            stmt.setString(4, "404 YYY Street");
+            stmt.setString(5, null);
+            stmt.execute();
+
+            stmt.setString(1, "0000000005");
+            stmt.setString(2, "S5");
+            stmt.setString(3, "888-888-5555");
+            stmt.setString(4, "505 YYY Street");
+            stmt.setString(5, "10005");
+            stmt.execute();
+
+            stmt.setString(1, "0000000006");
+            stmt.setString(2, "S6");
+            stmt.setString(3, "888-888-6666");
+            stmt.setString(4, "606 YYY Street");
+            stmt.setString(5, "10006");
+            stmt.execute();
+
+            // Insert into order table
+            stmt = conn.prepareStatement(
+                    "upsert into " + JOIN_ORDER_TABLE_FULL_NAME +
+                    "   (\"order_id\", " +
+                    "    \"customer_id\", " +
+                    "    \"item_id\", " +
+                    "    PRICE, " +
+                    "    QUANTITY," +
+                    "    DATE) " +
+                    "values (?, ?, ?, ?, ?, ?)");
+            stmt.setString(1, "000000000000001");
+            stmt.setString(2, "0000000004");
+            stmt.setString(3, "0000000001");
+            stmt.setInt(4, 100);
+            stmt.setInt(5, 1000);
+            stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-22 14:22:56").getTime()));
+            stmt.execute();
+
+            stmt.setString(1, "000000000000002");
+            stmt.setString(2, "0000000003");
+            stmt.setString(3, "0000000006");
+            stmt.setInt(4, 552);
+            stmt.setInt(5, 2000);
+            stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-25 10:06:29").getTime()));
+            stmt.execute();
+
+            stmt.setString(1, "000000000000003");
+            stmt.setString(2, "0000000002");
+            stmt.setString(3, "0000000002");
+            stmt.setInt(4, 190);
+            stmt.setInt(5, 3000);
+            stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-25 16:45:07").getTime()));
+            stmt.execute();
+
+            stmt.setString(1, "000000000000004");
+            stmt.setString(2, "0000000004");
+            stmt.setString(3, "0000000006");
+            stmt.setInt(4, 510);
+            stmt.setInt(5, 4000);
+            stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-26 13:26:04").getTime()));
+            stmt.execute();
+
+            stmt.setString(1, "000000000000005");
+            stmt.setString(2, "0000000005");
+            stmt.setString(3, "0000000003");
+            stmt.setInt(4, 264);
+            stmt.setInt(5, 5000);
+            stmt.setTimestamp(6, new Timestamp(format.parse("2013-11-27 09:37:50").getTime()));
+            stmt.execute();
+
+            conn.commit();
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testDefaultJoin() throws Exception {
+        String query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\"";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000001");
+            assertEquals(rs.getString(2), "T1");
+            assertEquals(rs.getString(3), "0000000001");
+            assertEquals(rs.getString(4), "S1");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000002");
+            assertEquals(rs.getString(2), "T2");
+            assertEquals(rs.getString(3), "0000000001");
+            assertEquals(rs.getString(4), "S1");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000003");
+            assertEquals(rs.getString(2), "T3");
+            assertEquals(rs.getString(3), "0000000002");
+            assertEquals(rs.getString(4), "S2");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000004");
+            assertEquals(rs.getString(2), "T4");
+            assertEquals(rs.getString(3), "0000000002");
+            assertEquals(rs.getString(4), "S2");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000005");
+            assertEquals(rs.getString(2), "T5");
+            assertEquals(rs.getString(3), "0000000005");
+            assertEquals(rs.getString(4), "S5");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000006");
+            assertEquals(rs.getString(2), "T6");
+            assertEquals(rs.getString(3), "0000000006");
+            assertEquals(rs.getString(4), "S6");
+
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
+    public void testInnerJoin() throws Exception {
+        String query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ item.\"item_id\", item.name, supp.\"supplier_id\", supp.name, next value for my.seq FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item INNER JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\"";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000001");
+            assertEquals(rs.getString(2), "T1");
+            assertEquals(rs.getString(3), "0000000001");
+            assertEquals(rs.getString(4), "S1");
+            assertEquals(1, rs.getInt(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000002");
+            assertEquals(rs.getString(2), "T2");
+            assertEquals(rs.getString(3), "0000000001");
+            assertEquals(rs.getString(4), "S1");
+            assertEquals(2, rs.getInt(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000003");
+            assertEquals(rs.getString(2), "T3");
+            assertEquals(rs.getString(3), "0000000002");
+            assertEquals(rs.getString(4), "S2");
+            assertEquals(3, rs.getInt(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000004");
+            assertEquals(rs.getString(2), "T4");
+            assertEquals(rs.getString(3), "0000000002");
+            assertEquals(rs.getString(4), "S2");
+            assertEquals(4, rs.getInt(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000005");
+            assertEquals(rs.getString(2), "T5");
+            assertEquals(rs.getString(3), "0000000005");
+            assertEquals(rs.getString(4), "S5");
+            assertEquals(5, rs.getInt(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000006");
+            assertEquals(rs.getString(2), "T6");
+            assertEquals(rs.getString(3), "0000000006");
+            assertEquals(rs.getString(4), "S6");
+            assertEquals(6, rs.getInt(5));
+
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+            
+    @Test
+    public void testLeftJoin() throws Exception {
+        String query[] = new String[3];
+        query[0] = "SELECT /*+ USE_SORT_MERGE_JOIN*/ item.\"item_id\", item.name, supp.\"supplier_id\", supp.name, next value for my.seq FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item LEFT JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\"";
+        query[1] = "SELECT /*+ USE_SORT_MERGE_JOIN*/ " + JOIN_ITEM_TABLE_FULL_NAME + ".\"item_id\", " + JOIN_ITEM_TABLE_FULL_NAME + ".name, " + JOIN_SUPPLIER_TABLE_FULL_NAME + ".\"supplier_id\", " + JOIN_SUPPLIER_TABLE_FULL_NAME + ".name, next value for my.seq FROM " + JOIN_ITEM_TABLE_FULL_NAME + " LEFT JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " ON " + JOIN_ITEM_TABLE_FULL_NAME + ".\"supplier_id\" = " + JOIN_SUPPLIER_TABLE_FULL_NAME + ".\"supplier_id\" ORDER BY \"item_id\"";
+        query[2] = "SELECT /*+ USE_SORT_MERGE_JOIN*/ item.\"item_id\", " + JOIN_ITEM_TABLE_FULL_NAME + ".name, supp.\"supplier_id\", " + JOIN_SUPPLIER_TABLE_FULL_NAME + ".name, next value for my.seq FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item LEFT JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON " + JOIN_ITEM_TABLE_FULL_NAME + ".\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\"";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            for (int i = 0; i < query.length; i++) {
+                PreparedStatement statement = conn.prepareStatement(query[i]);
+                ResultSet rs = statement.executeQuery();
+                assertTrue (rs.next());
+                assertEquals(rs.getString(1), "0000000001");
+                assertEquals(rs.getString(2), "T1");
+                assertEquals(rs.getString(3), "0000000001");
+                assertEquals(rs.getString(4), "S1");
+                assertTrue (rs.next());
+                assertEquals(rs.getString(1), "0000000002");
+                assertEquals(rs.getString(2), "T2");
+                assertEquals(rs.getString(3), "0000000001");
+                assertEquals(rs.getString(4), "S1");
+                assertTrue (rs.next());
+                assertEquals(rs.getString(1), "0000000003");
+                assertEquals(rs.getString(2), "T3");
+                assertEquals(rs.getString(3), "0000000002");
+                assertEquals(rs.getString(4), "S2");
+                assertTrue (rs.next());
+                assertEquals(rs.getString(1), "0000000004");
+                assertEquals(rs.getString(2), "T4");
+                assertEquals(rs.getString(3), "0000000002");
+                assertEquals(rs.getString(4), "S2");
+                assertTrue (rs.next());
+                assertEquals(rs.getString(1), "0000000005");
+                assertEquals(rs.getString(2), "T5");
+                assertEquals(rs.getString(3), "0000000005");
+                assertEquals(rs.getString(4), "S5");
+                assertTrue (rs.next());
+                assertEquals(rs.getString(1), "0000000006");
+                assertEquals(rs.getString(2), "T6");
+                assertEquals(rs.getString(3), "0000000006");
+                assertEquals(rs.getString(4), "S6");
+                assertTrue (rs.next());
+                assertEquals(rs.getString(1), "invalid001");
+                assertEquals(rs.getString(2), "INVALID-1");
+                assertNull(rs.getString(3));
+                assertNull(rs.getString(4));
+
+                assertFalse(rs.next());
+                rs.close();
+                statement.close();
+            }
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testRightJoin() throws Exception {
+        String query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp RIGHT JOIN " + JOIN_ITEM_TABLE_FULL_NAME + " item ON item.\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\"";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000001");
+            assertEquals(rs.getString(2), "T1");
+            assertEquals(rs.getString(3), "0000000001");
+            assertEquals(rs.getString(4), "S1");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000002");
+            assertEquals(rs.getString(2), "T2");
+            assertEquals(rs.getString(3), "0000000001");
+            assertEquals(rs.getString(4), "S1");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000003");
+            assertEquals(rs.getString(2), "T3");
+            assertEquals(rs.getString(3), "0000000002");
+            assertEquals(rs.getString(4), "S2");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000004");
+            assertEquals(rs.getString(2), "T4");
+            assertEquals(rs.getString(3), "0000000002");
+            assertEquals(rs.getString(4), "S2");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000005");
+            assertEquals(rs.getString(2), "T5");
+            assertEquals(rs.getString(3), "0000000005");
+            assertEquals(rs.getString(4), "S5");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000006");
+            assertEquals(rs.getString(2), "T6");
+            assertEquals(rs.getString(3), "0000000006");
+            assertEquals(rs.getString(4), "S6");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "invalid001");
+            assertEquals(rs.getString(2), "INVALID-1");
+            assertNull(rs.getString(3));
+            assertNull(rs.getString(4));
+
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testInnerJoinWithPreFilters() throws Exception {
+        String query1 = "SELECT /*+ USE_SORT_MERGE_JOIN*/ item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item INNER JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" AND supp.\"supplier_id\" BETWEEN '0000000001' AND '0000000005' ORDER BY \"item_id\"";
+        String query2 = "SELECT /*+ USE_SORT_MERGE_JOIN*/ item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item INNER JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" AND (supp.\"supplier_id\" = '0000000001' OR supp.\"supplier_id\" = '0000000005') ORDER BY \"item_id\"";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query1);
+            ResultSet rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000001");
+            assertEquals(rs.getString(2), "T1");
+            assertEquals(rs.getString(3), "0000000001");
+            assertEquals(rs.getString(4), "S1");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000002");
+            assertEquals(rs.getString(2), "T2");
+            assertEquals(rs.getString(3), "0000000001");
+            assertEquals(rs.getString(4), "S1");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000003");
+            assertEquals(rs.getString(2), "T3");
+            assertEquals(rs.getString(3), "0000000002");
+            assertEquals(rs.getString(4), "S2");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000004");
+            assertEquals(rs.getString(2), "T4");
+            assertEquals(rs.getString(3), "0000000002");
+            assertEquals(rs.getString(4), "S2");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000005");
+            assertEquals(rs.getString(2), "T5");
+            assertEquals(rs.getString(3), "0000000005");
+            assertEquals(rs.getString(4), "S5");
+
+            assertFalse(rs.next());
+            
+            
+            statement = conn.prepareStatement(query2);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000001");
+            assertEquals(rs.getString(2), "T1");
+            assertEquals(rs.getString(3), "0000000001");
+            assertEquals(rs.getString(4), "S1");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000002");
+            assertEquals(rs.getString(2), "T2");
+            assertEquals(rs.getString(3), "0000000001");
+            assertEquals(rs.getString(4), "S1");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000005");
+            assertEquals(rs.getString(2), "T5");
+            assertEquals(rs.getString(3), "0000000005");
+            assertEquals(rs.getString(4), "S5");
+
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testLeftJoinWithPreFilters() throws Exception {
+        String query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item LEFT JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" AND (supp.\"supplier_id\" = '0000000001' OR supp.\"supplier_id\" = '0000000005') ORDER BY \"item_id\"";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000001");
+            assertEquals(rs.getString(2), "T1");
+            assertEquals(rs.getString(3), "0000000001");
+            assertEquals(rs.getString(4), "S1");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000002");
+            assertEquals(rs.getString(2), "T2");
+            assertEquals(rs.getString(3), "0000000001");
+            assertEquals(rs.getString(4), "S1");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000003");
+            assertEquals(rs.getString(2), "T3");
+            assertNull(rs.getString(3));
+            assertNull(rs.getString(4));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000004");
+            assertEquals(rs.getString(2), "T4");
+            assertNull(rs.getString(3));
+            assertNull(rs.getString(4));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000005");
+            assertEquals(rs.getString(2), "T5");
+            assertEquals(rs.getString(3), "0000000005");
+            assertEquals(rs.getString(4), "S5");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000006");
+            assertEquals(rs.getString(2), "T6");
+            assertNull(rs.getString(3));
+            assertNull(rs.getString(4));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "invalid001");
+            assertEquals(rs.getString(2), "INVALID-1");
+            assertNull(rs.getString(3));
+            assertNull(rs.getString(4));
+
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testJoinWithPostFilters() throws Exception {
+        String query1 = "SELECT /*+ USE_SORT_MERGE_JOIN*/ item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp RIGHT JOIN " + JOIN_ITEM_TABLE_FULL_NAME + " item ON item.\"supplier_id\" = supp.\"supplier_id\" WHERE supp.\"supplier_id\" BETWEEN '0000000001' AND '0000000005' ORDER BY \"item_id\"";
+        String query2 = "SELECT /*+ USE_SORT_MERGE_JOIN*/ item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + JOIN_ITEM_TABLE_FULL_NAME + " item LEFT JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON item.\"supplier_id\" = supp.\"supplier_id\" WHERE supp.\"supplier_id\" = '0000000001' OR supp.\"supplier_id\" = '0000000005' ORDER BY \"item_id\"";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query1);
+            ResultSet rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000001");
+            assertEquals(rs.getString(2), "T1");
+            assertEquals(rs.getString(3), "0000000001");
+            assertEquals(rs.getString(4), "S1");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000002");
+            assertEquals(rs.getString(2), "T2");
+            assertEquals(rs.getString(3), "0000000001");
+            assertEquals(rs.getString(4), "S1");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000003");
+            assertEquals(rs.getString(2), "T3");
+            assertEquals(rs.getString(3), "0000000002");
+            assertEquals(rs.getString(4), "S2");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000004");
+            assertEquals(rs.getString(2), "T4");
+            assertEquals(rs.getString(3), "0000000002");
+            assertEquals(rs.getString(4), "S2");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000005");
+            assertEquals(rs.getString(2), "T5");
+            assertEquals(rs.getString(3), "0000000005");
+            assertEquals(rs.getString(4), "S5");
+
+            assertFalse(rs.next());
+            
+            
+            statement = conn.prepareStatement(query2);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000001");
+            assertEquals(rs.getString(2), "T1");
+            assertEquals(rs.getString(3), "0000000001");
+            assertEquals(rs.getString(4), "S1");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000002");
+            assertEquals(rs.getString(2), "T2");
+            assertEquals(rs.getString(3), "0000000001");
+            assertEquals(rs.getString(4), "S1");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000005");
+            assertEquals(rs.getString(2), "T5");
+            assertEquals(rs.getString(3), "0000000005");
+            assertEquals(rs.getString(4), "S5");
+
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testStarJoin() throws Exception {
+        String[] query = new String[5];
+        query[0] = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", c.name, i.name iname, quantity, o.date FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o JOIN " 
+            + JOIN_CUSTOMER_TABLE_FULL_NAME + " c ON o.\"customer_id\" = c.\"customer_id\" JOIN " 
+            + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" ORDER BY \"order_id\"";
+        query[1] = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", c.name, i.name iname, quantity, o.date FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o, " 
+                + JOIN_CUSTOMER_TABLE_FULL_NAME + " c, " 
+                + JOIN_ITEM_TABLE_FULL_NAME + " i WHERE o.\"item_id\" = i.\"item_id\" AND o.\"customer_id\" = c.\"customer_id\" ORDER BY \"order_id\"";
+        query[2] = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", c.name, i.name iname, quantity, o.date FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o JOIN " 
+                + JOIN_CUSTOMER_TABLE_FULL_NAME + " c ON o.\"customer_id\" = c.\"customer_id\" JOIN " 
+                + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" ORDER BY \"order_id\"";
+        query[3] = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", c.name, i.name iname, quantity, o.date FROM (" + JOIN_ORDER_TABLE_FULL_NAME + " o, " 
+                + JOIN_CUSTOMER_TABLE_FULL_NAME + " c), " 
+                + JOIN_ITEM_TABLE_FULL_NAME + " i WHERE o.\"item_id\" = i.\"item_id\" AND o.\"customer_id\" = c.\"customer_id\" ORDER BY \"order_id\"";
+        query[4] = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", c.name, i.name iname, quantity, o.date FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o, (" 
+                + JOIN_CUSTOMER_TABLE_FULL_NAME + " c, " 
+                + JOIN_ITEM_TABLE_FULL_NAME + " i) WHERE o.\"item_id\" = i.\"item_id\" AND o.\"customer_id\" = c.\"customer_id\" ORDER BY \"order_id\"";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            for (int i = 0; i < query.length; i++) {
+                PreparedStatement statement = conn.prepareStatement(query[i]);
+                ResultSet rs = statement.executeQuery();
+                assertTrue (rs.next());
+                assertEquals(rs.getString(1), "000000000000001");
+                assertEquals(rs.getString("\"order_id\""), "000000000000001");
+                assertEquals(rs.getString(2), "C4");
+                assertEquals(rs.getString("C.name"), "C4");
+                assertEquals(rs.getString(3), "T1");
+                assertEquals(rs.getString("iName"), "T1");
+                assertEquals(rs.getInt(4), 1000);
+                assertEquals(rs.getInt("Quantity"), 1000);
+                assertNotNull(rs.getDate(5));
+                assertTrue (rs.next());
+                assertEquals(rs.getString(1), "000000000000002");
+                assertEquals(rs.getString(2), "C3");
+                assertEquals(rs.getString(3), "T6");
+                assertEquals(rs.getInt(4), 2000);
+                assertNotNull(rs.getDate(5));
+                assertTrue (rs.next());
+                assertEquals(rs.getString(1), "000000000000003");
+                assertEquals(rs.getString(2), "C2");
+                assertEquals(rs.getString(3), "T2");
+                assertEquals(rs.getInt(4), 3000);
+                assertNotNull(rs.getDate(5));
+                assertTrue (rs.next());
+                assertEquals(rs.getString(1), "000000000000004");
+                assertEquals(rs.getString(2), "C4");
+                assertEquals(rs.getString(3), "T6");
+                assertEquals(rs.getInt(4), 4000);
+                assertNotNull(rs.getDate(5));
+                assertTrue (rs.next());
+                assertEquals(rs.getString(1), "000000000000005");
+                assertEquals(rs.getString(2), "C5");
+                assertEquals(rs.getString(3), "T3");
+                assertEquals(rs.getInt(4), 5000);
+                assertNotNull(rs.getDate(5));
+
+                assertFalse(rs.next());
+            }
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testLeftJoinWithAggregation() throws Exception {
+        String query1 = "SELECT /*+ USE_SORT_MERGE_JOIN*/ i.name, sum(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o LEFT JOIN " 
+            + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" GROUP BY i.name ORDER BY i.name";
+        String query2 = "SELECT /*+ USE_SORT_MERGE_JOIN*/ i.\"item_id\" iid, sum(quantity) q FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o LEFT JOIN " 
+                + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" GROUP BY i.\"item_id\" ORDER BY q DESC";
+        String query3 = "SELECT /*+ USE_SORT_MERGE_JOIN*/ i.\"item_id\" iid, sum(quantity) q FROM " + JOIN_ITEM_TABLE_FULL_NAME + " i LEFT JOIN " 
+                + JOIN_ORDER_TABLE_FULL_NAME + " o ON o.\"item_id\" = i.\"item_id\" GROUP BY i.\"item_id\" ORDER BY q DESC NULLS LAST, iid";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query1);
+            ResultSet rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "T1");
+            assertEquals(rs.getInt(2), 1000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "T2");
+            assertEquals(rs.getInt(2), 3000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "T3");
+            assertEquals(rs.getInt(2), 5000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "T6");
+            assertEquals(rs.getInt(2), 6000);
+
+            assertFalse(rs.next());
+            
+            statement = conn.prepareStatement(query2);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString("iid"), "0000000006");
+            assertEquals(rs.getInt("q"), 6000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString("iid"), "0000000003");
+            assertEquals(rs.getInt("q"), 5000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString("iid"), "0000000002");
+            assertEquals(rs.getInt("q"), 3000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString("iid"), "0000000001");
+            assertEquals(rs.getInt("q"), 1000);
+
+            assertFalse(rs.next());
+            
+            statement = conn.prepareStatement(query3);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString("iid"), "0000000006");
+            assertEquals(rs.getInt("q"), 6000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString("iid"), "0000000003");
+            assertEquals(rs.getInt("q"), 5000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString("iid"), "0000000002");
+            assertEquals(rs.getInt("q"), 3000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString("iid"), "0000000001");
+            assertEquals(rs.getInt("q"), 1000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString("iid"), "0000000004");
+            assertEquals(rs.getInt("q"), 0);
+            assertTrue (rs.next());
+            assertEquals(rs.getString("iid"), "0000000005");
+            assertEquals(rs.getInt("q"), 0);
+            assertTrue (rs.next());
+            assertEquals(rs.getString("iid"), "invalid001");
+            assertEquals(rs.getInt("q"), 0);
+
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testRightJoinWithAggregation() throws Exception {
+        String query1 = "SELECT /*+ USE_SORT_MERGE_JOIN*/ i.name, sum(quantity) FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o RIGHT JOIN " 
+            + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" GROUP BY i.name ORDER BY i.name";
+        String query2 = "SELECT /*+ USE_SORT_MERGE_JOIN*/ i.\"item_id\" iid, sum(quantity) q FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o RIGHT JOIN " 
+            + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" GROUP BY i.\"item_id\" ORDER BY q DESC NULLS LAST, iid";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query1);
+            ResultSet rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "INVALID-1");
+            assertEquals(rs.getInt(2), 0);
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "T1");
+            assertEquals(rs.getInt(2), 1000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "T2");
+            assertEquals(rs.getInt(2), 3000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "T3");
+            assertEquals(rs.getInt(2), 5000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "T4");
+            assertEquals(rs.getInt(2), 0);
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "T5");
+            assertEquals(rs.getInt(2), 0);
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "T6");
+            assertEquals(rs.getInt(2), 6000);
+
+            assertFalse(rs.next());
+            
+            statement = conn.prepareStatement(query2);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString("iid"), "0000000006");
+            assertEquals(rs.getInt("q"), 6000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString("iid"), "0000000003");
+            assertEquals(rs.getInt("q"), 5000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString("iid"), "0000000002");
+            assertEquals(rs.getInt("q"), 3000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString("iid"), "0000000001");
+            assertEquals(rs.getInt("q"), 1000);
+            assertTrue (rs.next());
+            assertEquals(rs.getString("iid"), "0000000004");
+            assertEquals(rs.getInt("q"), 0);
+            assertTrue (rs.next());
+            assertEquals(rs.getString("iid"), "0000000005");
+            assertEquals(rs.getInt("q"), 0);
+            assertTrue (rs.next());
+            assertEquals(rs.getString("iid"), "invalid001");
+            assertEquals(rs.getInt("q"), 0);
+
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testLeftRightJoin() throws Exception {
+        String query1 = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", i.name, s.name, quantity, date FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o LEFT JOIN " 
+                + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" RIGHT JOIN "
+                + JOIN_SUPPLIER_TABLE_FULL_NAME + " s ON i.\"supplier_id\" = s.\"supplier_id\" ORDER BY \"order_id\", s.\"supplier_id\" DESC";
+        String query2 = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", i.name, s.name, quantity, date FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o LEFT JOIN " 
+                + "(" + JOIN_ITEM_TABLE_FULL_NAME + " i RIGHT JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " s ON i.\"supplier_id\" = s.\"supplier_id\")" 
+                + " ON o.\"item_id\" = i.\"item_id\" ORDER BY \"order_id\", s.\"supplier_id\" DESC";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query1);
+            ResultSet rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertNull(rs.getString(1));
+            assertNull(rs.getString(2));
+            assertEquals(rs.getString(3), "S5");
+            assertEquals(rs.getInt(4), 0);
+            assertNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertNull(rs.getString(1));
+            assertNull(rs.getString(2));
+            assertEquals(rs.getString(3), "S4");
+            assertEquals(rs.getInt(4), 0);
+            assertNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertNull(rs.getString(1));
+            assertNull(rs.getString(2));
+            assertEquals(rs.getString(3), "S3");
+            assertEquals(rs.getInt(4), 0);
+            assertNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000001");
+            assertEquals(rs.getString(2), "T1");
+            assertEquals(rs.getString(3), "S1");
+            assertEquals(rs.getInt(4), 1000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000002");
+            assertEquals(rs.getString(2), "T6");
+            assertEquals(rs.getString(3), "S6");
+            assertEquals(rs.getInt(4), 2000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000003");
+            assertEquals(rs.getString(2), "T2");
+            assertEquals(rs.getString(3), "S1");
+            assertEquals(rs.getInt(4), 3000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000004");
+            assertEquals(rs.getString(2), "T6");
+            assertEquals(rs.getString(3), "S6");
+            assertEquals(rs.getInt(4), 4000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000005");
+            assertEquals(rs.getString(2), "T3");
+            assertEquals(rs.getString(3), "S2");
+            assertEquals(rs.getInt(4), 5000);
+            assertNotNull(rs.getDate(5));
+
+            assertFalse(rs.next());
+            
+            statement = conn.prepareStatement(query2);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000001");
+            assertEquals(rs.getString(2), "T1");
+            assertEquals(rs.getString(3), "S1");
+            assertEquals(rs.getInt(4), 1000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000002");
+            assertEquals(rs.getString(2), "T6");
+            assertEquals(rs.getString(3), "S6");
+            assertEquals(rs.getInt(4), 2000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000003");
+            assertEquals(rs.getString(2), "T2");
+            assertEquals(rs.getString(3), "S1");
+            assertEquals(rs.getInt(4), 3000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000004");
+            assertEquals(rs.getString(2), "T6");
+            assertEquals(rs.getString(3), "S6");
+            assertEquals(rs.getInt(4), 4000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000005");
+            assertEquals(rs.getString(2), "T3");
+            assertEquals(rs.getString(3), "S2");
+            assertEquals(rs.getInt(4), 5000);
+            assertNotNull(rs.getDate(5));
+
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testRightLeftJoin() throws Exception {
+        String query1 = "SELECT \"order_id\", i.name, s.name, quantity, date FROM " + JOIN_ITEM_TABLE_FULL_NAME + " i RIGHT JOIN " 
+                + JOIN_ORDER_TABLE_FULL_NAME + " o ON o.\"item_id\" = i.\"item_id\" LEFT JOIN "
+                + JOIN_SUPPLIER_TABLE_FULL_NAME + " s ON i.\"supplier_id\" = s.\"supplier_id\" ORDER BY \"order_id\"";
+        String query2 = "SELECT \"order_id\", i.name, s.name, quantity, date FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o RIGHT JOIN " 
+                + "(" + JOIN_ITEM_TABLE_FULL_NAME + " i LEFT JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " s ON i.\"supplier_id\" = s.\"supplier_id\")" 
+                + " ON o.\"item_id\" = i.\"item_id\" ORDER BY \"order_id\", s.\"supplier_id\" DESC";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query1);
+            ResultSet rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000001");
+            assertEquals(rs.getString(2), "T1");
+            assertEquals(rs.getString(3), "S1");
+            assertEquals(rs.getInt(4), 1000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000002");
+            assertEquals(rs.getString(2), "T6");
+            assertEquals(rs.getString(3), "S6");
+            assertEquals(rs.getInt(4), 2000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000003");
+            assertEquals(rs.getString(2), "T2");
+            assertEquals(rs.getString(3), "S1");
+            assertEquals(rs.getInt(4), 3000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000004");
+            assertEquals(rs.getString(2), "T6");
+            assertEquals(rs.getString(3), "S6");
+            assertEquals(rs.getInt(4), 4000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000005");
+            assertEquals(rs.getString(2), "T3");
+            assertEquals(rs.getString(3), "S2");
+            assertEquals(rs.getInt(4), 5000);
+            assertNotNull(rs.getDate(5));
+
+            assertFalse(rs.next());
+            
+            statement = conn.prepareStatement(query2);
+            rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertNull(rs.getString(1));
+            assertEquals(rs.getString(2), "INVALID-1");
+            assertNull(rs.getString(3));
+            assertEquals(rs.getInt(4), 0);
+            assertNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertNull(rs.getString(1));
+            assertEquals(rs.getString(2), "T5");
+            assertEquals(rs.getString(3), "S5");
+            assertEquals(rs.getInt(4), 0);
+            assertNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertNull(rs.getString(1));
+            assertEquals(rs.getString(2), "T4");
+            assertEquals(rs.getString(3), "S2");
+            assertEquals(rs.getInt(4), 0);
+            assertNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000001");
+            assertEquals(rs.getString(2), "T1");
+            assertEquals(rs.getString(3), "S1");
+            assertEquals(rs.getInt(4), 1000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000002");
+            assertEquals(rs.getString(2), "T6");
+            assertEquals(rs.getString(3), "S6");
+            assertEquals(rs.getInt(4), 2000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000003");
+            assertEquals(rs.getString(2), "T2");
+            assertEquals(rs.getString(3), "S1");
+            assertEquals(rs.getInt(4), 3000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000004");
+            assertEquals(rs.getString(2), "T6");
+            assertEquals(rs.getString(3), "S6");
+            assertEquals(rs.getInt(4), 4000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000005");
+            assertEquals(rs.getString(2), "T3");
+            assertEquals(rs.getString(3), "S2");
+            assertEquals(rs.getInt(4), 5000);
+            assertNotNull(rs.getDate(5));
+
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testMultiLeftJoin() throws Exception {
+        String[] queries = {
+                "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", i.name, s.name, quantity, date FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o LEFT JOIN " 
+                        + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" LEFT JOIN "
+                        + JOIN_SUPPLIER_TABLE_FULL_NAME + " s ON i.\"supplier_id\" = s.\"supplier_id\" ORDER BY \"order_id\"",
+                "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", i.name, s.name, quantity, date FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o LEFT JOIN " 
+                        + "(" + JOIN_ITEM_TABLE_FULL_NAME + " i LEFT JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " s ON i.\"supplier_id\" = s.\"supplier_id\") " 
+                        + "ON o.\"item_id\" = i.\"item_id\" ORDER BY \"order_id\""};
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            for (String query : queries) {
+                PreparedStatement statement = conn.prepareStatement(query);
+                ResultSet rs = statement.executeQuery();
+                assertTrue (rs.next());
+                assertEquals(rs.getString(1), "000000000000001");
+                assertEquals(rs.getString(2), "T1");
+                assertEquals(rs.getString(3), "S1");
+                assertEquals(rs.getInt(4), 1000);
+                assertNotNull(rs.getDate(5));
+                assertTrue (rs.next());
+                assertEquals(rs.getString(1), "000000000000002");
+                assertEquals(rs.getString(2), "T6");
+                assertEquals(rs.getString(3), "S6");
+                assertEquals(rs.getInt(4), 2000);
+                assertNotNull(rs.getDate(5));
+                assertTrue (rs.next());
+                assertEquals(rs.getString(1), "000000000000003");
+                assertEquals(rs.getString(2), "T2");
+                assertEquals(rs.getString(3), "S1");
+                assertEquals(rs.getInt(4), 3000);
+                assertNotNull(rs.getDate(5));
+                assertTrue (rs.next());
+                assertEquals(rs.getString(1), "000000000000004");
+                assertEquals(rs.getString(2), "T6");
+                assertEquals(rs.getString(3), "S6");
+                assertEquals(rs.getInt(4), 4000);
+                assertNotNull(rs.getDate(5));
+                assertTrue (rs.next());
+                assertEquals(rs.getString(1), "000000000000005");
+                assertEquals(rs.getString(2), "T3");
+                assertEquals(rs.getString(3), "S2");
+                assertEquals(rs.getInt(4), 5000);
+                assertNotNull(rs.getDate(5));
+
+                assertFalse(rs.next());
+            }
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testMultiRightJoin() throws Exception {
+        String query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", i.name, s.name, quantity, date FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o RIGHT JOIN " 
+            + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" RIGHT JOIN "
+            + JOIN_SUPPLIER_TABLE_FULL_NAME + " s ON i.\"supplier_id\" = s.\"supplier_id\" ORDER BY \"order_id\", s.\"supplier_id\" DESC";
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertNull(rs.getString(1));
+            assertEquals(rs.getString(2), "T5");
+            assertEquals(rs.getString(3), "S5");
+            assertEquals(rs.getInt(4), 0);
+            assertNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertNull(rs.getString(1));
+            assertNull(rs.getString(2));
+            assertEquals(rs.getString(3), "S4");
+            assertEquals(rs.getInt(4), 0);
+            assertNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertNull(rs.getString(1));
+            assertNull(rs.getString(2));
+            assertEquals(rs.getString(3), "S3");
+            assertEquals(rs.getInt(4), 0);
+            assertNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertNull(rs.getString(1));
+            assertEquals(rs.getString(2), "T4");
+            assertEquals(rs.getString(3), "S2");
+            assertEquals(rs.getInt(4), 0);
+            assertNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000001");
+            assertEquals(rs.getString(2), "T1");
+            assertEquals(rs.getString(3), "S1");
+            assertEquals(rs.getInt(4), 1000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000002");
+            assertEquals(rs.getString(2), "T6");
+            assertEquals(rs.getString(3), "S6");
+            assertEquals(rs.getInt(4), 2000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000003");
+            assertEquals(rs.getString(2), "T2");
+            assertEquals(rs.getString(3), "S1");
+            assertEquals(rs.getInt(4), 3000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000004");
+            assertEquals(rs.getString(2), "T6");
+            assertEquals(rs.getString(3), "S6");
+            assertEquals(rs.getInt(4), 4000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000005");
+            assertEquals(rs.getString(2), "T3");
+            assertEquals(rs.getString(3), "S2");
+            assertEquals(rs.getInt(4), 5000);
+            assertNotNull(rs.getDate(5));
+
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+
+    // Basically a copy of testMultiRightJoin, but with a very small result scan chunk size
+    // to test that repeated row keys within a single chunk are handled properly
+    @Test
+    public void testMultiRightJoin_SmallChunkSize() throws Exception {
+        String query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", i.name, s.name, quantity, date FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o RIGHT JOIN "
+                + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" RIGHT JOIN "
+                + JOIN_SUPPLIER_TABLE_FULL_NAME + " s ON i.\"supplier_id\" = s.\"supplier_id\" ORDER BY \"order_id\", s.\"supplier_id\" DESC";
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, "1");
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertNull(rs.getString(1));
+            assertEquals(rs.getString(2), "T5");
+            assertEquals(rs.getString(3), "S5");
+            assertEquals(rs.getInt(4), 0);
+            assertNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertNull(rs.getString(1));
+            assertNull(rs.getString(2));
+            assertEquals(rs.getString(3), "S4");
+            assertEquals(rs.getInt(4), 0);
+            assertNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertNull(rs.getString(1));
+            assertNull(rs.getString(2));
+            assertEquals(rs.getString(3), "S3");
+            assertEquals(rs.getInt(4), 0);
+            assertNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertNull(rs.getString(1));
+            assertEquals(rs.getString(2), "T4");
+            assertEquals(rs.getString(3), "S2");
+            assertEquals(rs.getInt(4), 0);
+            assertNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000001");
+            assertEquals(rs.getString(2), "T1");
+            assertEquals(rs.getString(3), "S1");
+            assertEquals(rs.getInt(4), 1000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000002");
+            assertEquals(rs.getString(2), "T6");
+            assertEquals(rs.getString(3), "S6");
+            assertEquals(rs.getInt(4), 2000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000003");
+            assertEquals(rs.getString(2), "T2");
+            assertEquals(rs.getString(3), "S1");
+            assertEquals(rs.getInt(4), 3000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000004");
+            assertEquals(rs.getString(2), "T6");
+            assertEquals(rs.getString(3), "S6");
+            assertEquals(rs.getInt(4), 4000);
+            assertNotNull(rs.getDate(5));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000005");
+            assertEquals(rs.getString(2), "T3");
+            assertEquals(rs.getString(3), "S2");
+            assertEquals(rs.getInt(4), 5000);
+            assertNotNull(rs.getDate(5));
+
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testJoinWithWildcard() throws Exception {
+        String query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ * FROM " + JOIN_ITEM_TABLE_FULL_NAME + " LEFT JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " supp ON " + JOIN_ITEM_TABLE_FULL_NAME + ".\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\"";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".item_id"), "0000000001");
+            assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".NAME"), "T1");
+            assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".PRICE"), 100);
+            assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DISCOUNT1"), 5);
+            assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DISCOUNT2"), 10);
+            assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".supplier_id"), "0000000001");
+            assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DESCRIPTION"), "Item T1");
+            assertEquals(rs.getString("SUPP.supplier_id"), "0000000001");
+            assertEquals(rs.getString("supp.name"), "S1");
+            assertEquals(rs.getString("supp.phone"), "888-888-1111");
+            assertEquals(rs.getString("supp.address"), "101 YYY Street");
+            assertEquals(rs.getString("supp.loc_id"), "10001");            
+            assertTrue (rs.next());
+            assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".item_id"), "0000000002");
+            assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".NAME"), "T2");
+            assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".PRICE"), 200);
+            assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DISCOUNT1"), 5);
+            assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DISCOUNT2"), 8);
+            assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".supplier_id"), "0000000001");
+            assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DESCRIPTION"), "Item T2");
+            assertEquals(rs.getString("SUPP.supplier_id"), "0000000001");
+            assertEquals(rs.getString("supp.name"), "S1");
+            assertEquals(rs.getString("supp.phone"), "888-888-1111");
+            assertEquals(rs.getString("supp.address"), "101 YYY Street");
+            assertEquals(rs.getString("supp.loc_id"), "10001");            
+            assertTrue (rs.next());
+            assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".item_id"), "0000000003");
+            assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".NAME"), "T3");
+            assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".PRICE"), 300);
+            assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DISCOUNT1"), 8);
+            assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DISCOUNT2"), 12);
+            assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".supplier_id"), "0000000002");
+            assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DESCRIPTION"), "Item T3");
+            assertEquals(rs.getString("SUPP.supplier_id"), "0000000002");
+            assertEquals(rs.getString("supp.name"), "S2");
+            assertEquals(rs.getString("supp.phone"), "888-888-2222");
+            assertEquals(rs.getString("supp.address"), "202 YYY Street");
+            assertEquals(rs.getString("supp.loc_id"), "10002");            
+            assertTrue (rs.next());
+            assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".item_id"), "0000000004");
+            assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".NAME"), "T4");
+            assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".PRICE"), 400);
+            assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DISCOUNT1"), 6);
+            assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DISCOUNT2"), 10);
+            assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".supplier_id"), "0000000002");
+            assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DESCRIPTION"), "Item T4");
+            assertEquals(rs.getString("SUPP.supplier_id"), "0000000002");
+            assertEquals(rs.getString("supp.name"), "S2");
+            assertEquals(rs.getString("supp.phone"), "888-888-2222");
+            assertEquals(rs.getString("supp.address"), "202 YYY Street");
+            assertEquals(rs.getString("supp.loc_id"), "10002");            
+            assertTrue (rs.next());
+            assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".item_id"), "0000000005");
+            assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".NAME"), "T5");
+            assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".PRICE"), 500);
+            assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DISCOUNT1"), 8);
+            assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DISCOUNT2"), 15);
+            assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".supplier_id"), "0000000005");
+            assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DESCRIPTION"), "Item T5");
+            assertEquals(rs.getString("SUPP.supplier_id"), "0000000005");
+            assertEquals(rs.getString("supp.name"), "S5");
+            assertEquals(rs.getString("supp.phone"), "888-888-5555");
+            assertEquals(rs.getString("supp.address"), "505 YYY Street");
+            assertEquals(rs.getString("supp.loc_id"), "10005");            
+            assertTrue (rs.next());
+            assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".item_id"), "0000000006");
+            assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".NAME"), "T6");
+            assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".PRICE"), 600);
+            assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DISCOUNT1"), 8);
+            assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DISCOUNT2"), 15);
+            assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".supplier_id"), "0000000006");
+            assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DESCRIPTION"), "Item T6");
+            assertEquals(rs.getString("SUPP.supplier_id"), "0000000006");
+            assertEquals(rs.getString("supp.name"), "S6");
+            assertEquals(rs.getString("supp.phone"), "888-888-6666");
+            assertEquals(rs.getString("supp.address"), "606 YYY Street");
+            assertEquals(rs.getString("supp.loc_id"), "10006");            
+            assertTrue (rs.next());
+            assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".item_id"), "invalid001");
+            assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".NAME"), "INVALID-1");
+            assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".PRICE"), 0);
+            assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DISCOUNT1"), 0);
+            assertEquals(rs.getInt(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DISCOUNT2"), 0);
+            assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".supplier_id"), "0000000000");
+            assertEquals(rs.getString(JOIN_ITEM_TABLE_DISPLAY_NAME + ".DESCRIPTION"), "Invalid item for join test");
+            assertNull(rs.getString("SUPP.supplier_id"));
+            assertNull(rs.getString("supp.name"));
+            assertNull(rs.getString("supp.phone"));
+            assertNull(rs.getString("supp.address"));
+            assertNull(rs.getString("supp.loc_id"));
+
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testJoinWithTableWildcard() throws Exception {
+        String query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ s.*, "+ JOIN_ITEM_TABLE_FULL_NAME + ".*, \"order_id\" FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o RIGHT JOIN " 
+                + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" RIGHT JOIN "
+                + JOIN_SUPPLIER_TABLE_FULL_NAME + " s ON i.\"supplier_id\" = s.\"supplier_id\" ORDER BY \"order_id\", s.\"supplier_id\" DESC";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            ResultSetMetaData md = rs.getMetaData();
+            assertEquals(md.getColumnCount(), 13);
+            
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000005");
+            assertEquals(rs.getString(2), "S5");
+            assertEquals(rs.getString(3), "888-888-5555");
+            assertEquals(rs.getString(4), "505 YYY Street");
+            assertEquals(rs.getString(5), "10005");
+            assertEquals(rs.getString(6), "0000000005");
+            assertEquals(rs.getString(7), "T5");
+            assertEquals(rs.getInt(8), 500);
+            assertEquals(rs.getInt(9), 8);
+            assertEquals(rs.getInt(10), 15);
+            assertEquals(rs.getString(11), "0000000005");
+            assertEquals(rs.getString(12), "Item T5");
+            assertNull(rs.getString(13));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000004");
+            assertEquals(rs.getString(2), "S4");
+            assertEquals(rs.getString(3), "888-888-4444");
+            assertEquals(rs.getString(4), "404 YYY Street");
+            assertNull(rs.getString(5));
+            assertNull(rs.getString(6));
+            assertNull(rs.getString(7));
+            assertEquals(rs.getInt(8), 0);
+            assertEquals(rs.getInt(9), 0);
+            assertEquals(rs.getInt(10), 0);
+            assertNull(rs.getString(11));
+            assertNull(rs.getString(12));            
+            assertNull(rs.getString(13));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000003");
+            assertEquals(rs.getString(2), "S3");
+            assertEquals(rs.getString(3), "888-888-3333");
+            assertEquals(rs.getString(4), "303 YYY Street");
+            assertNull(rs.getString(5));
+            assertNull(rs.getString(6));
+            assertNull(rs.getString(7));
+            assertEquals(rs.getInt(8), 0);
+            assertEquals(rs.getInt(9), 0);
+            assertEquals(rs.getInt(10), 0);
+            assertNull(rs.getString(11));
+            assertNull(rs.getString(12));            
+            assertNull(rs.getString(13));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000002");
+            assertEquals(rs.getString(2), "S2");
+            assertEquals(rs.getString(3), "888-888-2222");
+            assertEquals(rs.getString(4), "202 YYY Street");
+            assertEquals(rs.getString(5), "10002");
+            assertEquals(rs.getString(6), "0000000004");
+            assertEquals(rs.getString(7), "T4");
+            assertEquals(rs.getInt(8), 400);
+            assertEquals(rs.getInt(9), 6);
+            assertEquals(rs.getInt(10), 10);
+            assertEquals(rs.getString(11), "0000000002");
+            assertEquals(rs.getString(12), "Item T4");
+            assertNull(rs.getString(13));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000001");
+            assertEquals(rs.getString(2), "S1");
+            assertEquals(rs.getString(3), "888-888-1111");
+            assertEquals(rs.getString(4), "101 YYY Street");
+            assertEquals(rs.getString(5), "10001");
+            assertEquals(rs.getString(6), "0000000001");
+            assertEquals(rs.getString(7), "T1");
+            assertEquals(rs.getInt(8), 100);
+            assertEquals(rs.getInt(9), 5);
+            assertEquals(rs.getInt(10), 10);
+            assertEquals(rs.getString(11), "0000000001");
+            assertEquals(rs.getString(12), "Item T1");
+            assertEquals(rs.getString(13), "000000000000001");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000006");
+            assertEquals(rs.getString(2), "S6");
+            assertEquals(rs.getString(3), "888-888-6666");
+            assertEquals(rs.getString(4), "606 YYY Street");
+            assertEquals(rs.getString(5), "10006");
+            assertEquals(rs.getString(6), "0000000006");
+            assertEquals(rs.getString(7), "T6");
+            assertEquals(rs.getInt(8), 600);
+            assertEquals(rs.getInt(9), 8);
+            assertEquals(rs.getInt(10), 15);
+            assertEquals(rs.getString(11), "0000000006");
+            assertEquals(rs.getString(12), "Item T6");
+            assertEquals(rs.getString(13), "000000000000002");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000001");
+            assertEquals(rs.getString(2), "S1");
+            assertEquals(rs.getString(3), "888-888-1111");
+            assertEquals(rs.getString(4), "101 YYY Street");
+            assertEquals(rs.getString(5), "10001");
+            assertEquals(rs.getString(6), "0000000002");
+            assertEquals(rs.getString(7), "T2");
+            assertEquals(rs.getInt(8), 200);
+            assertEquals(rs.getInt(9), 5);
+            assertEquals(rs.getInt(10), 8);
+            assertEquals(rs.getString(11), "0000000001");
+            assertEquals(rs.getString(12), "Item T2");
+            assertEquals(rs.getString(13), "000000000000003");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000006");
+            assertEquals(rs.getString(2), "S6");
+            assertEquals(rs.getString(3), "888-888-6666");
+            assertEquals(rs.getString(4), "606 YYY Street");
+            assertEquals(rs.getString(5), "10006");
+            assertEquals(rs.getString(6), "0000000006");
+            assertEquals(rs.getString(7), "T6");
+            assertEquals(rs.getInt(8), 600);
+            assertEquals(rs.getInt(9), 8);
+            assertEquals(rs.getInt(10), 15);
+            assertEquals(rs.getString(11), "0000000006");
+            assertEquals(rs.getString(12), "Item T6");
+            assertEquals(rs.getString(13), "000000000000004");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "0000000002");
+            assertEquals(rs.getString(2), "S2");
+            assertEquals(rs.getString(3), "888-888-2222");
+            assertEquals(rs.getString(4), "202 YYY Street");
+            assertEquals(rs.getString(5), "10002");
+            assertEquals(rs.getString(6), "0000000003");
+            assertEquals(rs.getString(7), "T3");
+            assertEquals(rs.getInt(8), 300);
+            assertEquals(rs.getInt(9), 8);
+            assertEquals(rs.getInt(10), 12);
+            assertEquals(rs.getString(11), "0000000002");
+            assertEquals(rs.getString(12), "Item T3");
+            assertEquals(rs.getString(13), "000000000000005");
+
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }        
+    }
+    
+    @Test
+    public void testJoinMultiJoinKeys() throws Exception {
+        String query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ c.name, s.name FROM " + JOIN_CUSTOMER_TABLE_FULL_NAME + " c LEFT JOIN " + JOIN_SUPPLIER_TABLE_FULL_NAME + " s ON \"customer_id\" = \"supplier_id\" AND c.loc_id = s.loc_id AND substr(s.name, 2, 1) = substr(c.name, 2, 1) ORDER BY \"customer_id\"";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "C1");
+            assertEquals(rs.getString(2), "S1");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "C2");
+            assertNull(rs.getString(2));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "C3");
+            assertEquals(rs.getString(2), "S3");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "C4");
+            assertNull(rs.getString(2));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "C5");
+            assertEquals(rs.getString(2), "S5");
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "C6");
+            assertNull(rs.getString(2));
+
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testJoinWithDifferentNumericJoinKeyTypes() throws Exception {
+        String query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", i.name, i.price, discount2, quantity FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o INNER JOIN " 
+            + JOIN_ITEM_TABLE_FULL_NAME + " i ON o.\"item_id\" = i.\"item_id\" AND o.price = (i.price * (100 - discount2)) / 100.0 WHERE quantity < 5000";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000004");
+            assertEquals(rs.getString(2), "T6");
+            assertEquals(rs.getInt(3), 600);
+            assertEquals(rs.getInt(4), 15);
+            assertEquals(rs.getInt(5), 4000);
+
+            assertFalse(rs.next());
+        } finally {
+            conn.close();
+        }
+    }
+    
+    @Test
+    public void testJoinWithDifferentDateJoinKeyTypes() throws Exception {
+        String query = "SELECT /*+ USE_SORT_MERGE_JOIN*/ \"order_id\", c.name, o.date FROM " + JOIN_ORDER_TABLE_FULL_NAME + " o INNER JOIN " 
+            + JOIN_CUSTOMER_TABLE_FULL_NAME + " c ON o.\"customer_id\" = c.\"customer_id\" AND o.date = c.date ORDER BY \"order_id\"";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000001");
+            assertEquals(rs.getString(2), "C4");
+            assertEquals(rs.getTimestamp(3), new Timestamp(format.parse("2013-11-22 14:22:56").getTime()));
+            assertTrue (rs.next());
+            assertEquals(rs.getString(1), "000000000000002");
+            assertEquals(rs.getString(2), "C3");
+            assertEquals(rs.getTimestamp(3), new Ti

<TRUNCATED>

[4/4] phoenix git commit: PHOENIX-1799 Support many-to-many joins

Posted by ma...@apache.org.
PHOENIX-1799 Support many-to-many joins


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ebc7ee42
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ebc7ee42
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ebc7ee42

Branch: refs/heads/4.0
Commit: ebc7ee42cdb2b05a293f54dc687ca975db9acbc3
Parents: 4a675d8
Author: maryannxue <ma...@apache.org>
Authored: Fri Nov 14 16:07:27 2014 -0500
Committer: maryannxue <ma...@apache.org>
Committed: Fri Nov 14 16:07:27 2014 -0500

----------------------------------------------------------------------
 .../apache/phoenix/end2end/SortMergeJoinIT.java | 2822 ++++++++++++++++++
 .../apache/phoenix/compile/GroupByCompiler.java |    4 +-
 .../apache/phoenix/compile/JoinCompiler.java    |  147 +-
 .../apache/phoenix/compile/OrderByCompiler.java |   11 +-
 .../apache/phoenix/compile/QueryCompiler.java   |  191 +-
 .../phoenix/compile/QueryCompiler.java.orig     |  444 ---
 .../phoenix/compile/SubselectRewriter.java      |   23 +-
 .../apache/phoenix/compile/UpsertCompiler.java  |    5 +
 .../apache/phoenix/compile/WhereCompiler.java   |    4 +-
 .../coprocessor/HashJoinRegionScanner.java      |   17 +-
 .../phoenix/execute/SortMergeJoinPlan.java      |  628 ++++
 .../apache/phoenix/execute/TupleProjector.java  |   59 +-
 .../phoenix/iterate/FilterResultIterator.java   |    3 +-
 .../phoenix/iterate/MappedByteBufferQueue.java  |  431 +++
 .../iterate/MappedByteBufferSortedQueue.java    |  372 +--
 .../java/org/apache/phoenix/parse/HintNode.java |    6 +-
 .../apache/phoenix/parse/ParseNodeFactory.java  |   11 +
 .../phoenix/parse/SelectStatementRewriter.java  |    2 +-
 .../org/apache/phoenix/schema/ValueBitSet.java  |   11 +-
 19 files changed, 4288 insertions(+), 903 deletions(-)
----------------------------------------------------------------------