You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by st...@apache.org on 2021/03/19 16:35:17 UTC

[phoenix] 02/02: PHOENIX-6422 Remove CorrelatePlan

This is an automated email from the ASF dual-hosted git repository.

stoty pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit e4ea2c547d3093e7f6538e3cb2f7ea351ce4402a
Author: Istvan Toth <st...@apache.org>
AuthorDate: Fri Mar 19 09:42:32 2021 +0100

    PHOENIX-6422 Remove CorrelatePlan
---
 .../org/apache/phoenix/execute/CorrelatePlan.java  | 265 ------------------
 .../execute/visitor/AvgRowWidthVisitor.java        |   6 -
 .../phoenix/execute/visitor/ByteCountVisitor.java  |   6 -
 .../phoenix/execute/visitor/QueryPlanVisitor.java  |   1 -
 .../phoenix/execute/visitor/RowCountVisitor.java   |  11 -
 .../apache/phoenix/compile/QueryCompilerTest.java  |   6 -
 .../apache/phoenix/execute/CorrelatePlanTest.java  | 299 ---------------------
 7 files changed, 594 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
deleted file mode 100644
index 991ac5b..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.execute;
-
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.List;
-
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.phoenix.compile.ExplainPlan;
-import org.apache.phoenix.compile.ExplainPlanAttributes;
-import org.apache.phoenix.compile.ExplainPlanAttributes
-    .ExplainPlanAttributesBuilder;
-import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.exception.SQLExceptionInfo;
-import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
-import org.apache.phoenix.execute.visitor.ByteCountVisitor;
-import org.apache.phoenix.execute.visitor.RowCountVisitor;
-import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
-import org.apache.phoenix.iterate.ParallelScanGrouper;
-import org.apache.phoenix.iterate.ResultIterator;
-import org.apache.phoenix.optimize.Cost;
-import org.apache.phoenix.parse.JoinTableNode.JoinType;
-import org.apache.phoenix.schema.KeyValueSchema;
-import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.ValueBitSet;
-import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
-import org.apache.phoenix.schema.tuple.Tuple;
-import org.apache.phoenix.util.SchemaUtil;
-
-import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
-
-public class CorrelatePlan extends DelegateQueryPlan {    
-    private final QueryPlan rhs;
-    private final String variableId;
-    private final JoinType joinType;
-    private final boolean isSingleValueOnly;
-    private final RuntimeContext runtimeContext;
-    private final KeyValueSchema joinedSchema;
-    private final KeyValueSchema lhsSchema;
-    private final KeyValueSchema rhsSchema;
-    private final int rhsFieldPosition;
-
-
-    public CorrelatePlan(QueryPlan lhs, QueryPlan rhs, String variableId, 
-            JoinType joinType, boolean isSingleValueOnly, 
-            RuntimeContext runtimeContext, PTable joinedTable, 
-            PTable lhsTable, PTable rhsTable, int rhsFieldPosition) {
-        super(lhs);
-        if (joinType != JoinType.Inner && joinType != JoinType.Left && joinType != JoinType.Semi && joinType != JoinType.Anti)
-            throw new IllegalArgumentException("Unsupported join type '" + joinType + "' by CorrelatePlan");
-        
-        this.rhs = rhs;
-        this.variableId = variableId;
-        this.joinType = joinType;
-        this.isSingleValueOnly = isSingleValueOnly;
-        this.runtimeContext = runtimeContext;
-        this.joinedSchema = buildSchema(joinedTable);
-        this.lhsSchema = buildSchema(lhsTable);
-        this.rhsSchema = buildSchema(rhsTable);
-        this.rhsFieldPosition = rhsFieldPosition;
-    }
-
-    private static KeyValueSchema buildSchema(PTable table) {
-        KeyValueSchemaBuilder builder = new KeyValueSchemaBuilder(0);
-        if (table != null) {
-            for (PColumn column : table.getColumns()) {
-                if (!SchemaUtil.isPKColumn(column)) {
-                    builder.addField(column);
-                }
-            }
-        }
-        return builder.build();
-    }
-
-    @Override
-    public ExplainPlan getExplainPlan() throws SQLException {
-        List<String> steps = Lists.newArrayList();
-        steps.add("NESTED-LOOP-JOIN (" + joinType.toString().toUpperCase() + ") TABLES");
-        ExplainPlan lhsExplainPlan = delegate.getExplainPlan();
-        List<String> lhsPlanSteps = lhsExplainPlan.getPlanSteps();
-        ExplainPlanAttributes lhsPlanAttributes =
-            lhsExplainPlan.getPlanStepsAsAttributes();
-        ExplainPlanAttributesBuilder lhsPlanBuilder =
-            new ExplainPlanAttributesBuilder(lhsPlanAttributes);
-        lhsPlanBuilder.setAbstractExplainPlan("NESTED-LOOP-JOIN ("
-            + joinType.toString().toUpperCase() + ")");
-
-        for (String step : lhsPlanSteps) {
-            steps.add("    " + step);
-        }
-        steps.add("AND" + (rhsSchema.getFieldCount() == 0 ? " (SKIP MERGE)" : ""));
-
-        ExplainPlan rhsExplainPlan = rhs.getExplainPlan();
-        List<String> rhsPlanSteps = rhsExplainPlan.getPlanSteps();
-        ExplainPlanAttributes rhsPlanAttributes =
-            rhsExplainPlan.getPlanStepsAsAttributes();
-        ExplainPlanAttributesBuilder rhsPlanBuilder =
-            new ExplainPlanAttributesBuilder(rhsPlanAttributes);
-
-        lhsPlanBuilder.setRhsJoinQueryExplainPlan(rhsPlanBuilder.build());
-
-        for (String step : rhsPlanSteps) {
-            steps.add("    " + step);
-        }
-        return new ExplainPlan(steps, lhsPlanBuilder.build());
-    }
-
-    @Override
-    public ResultIterator iterator(final ParallelScanGrouper scanGrouper, final Scan scan)
-            throws SQLException {
-        return new CorrelateResultIterator(scanGrouper, scan) ;
-    }
-
-    @Override
-    public Integer getLimit() {
-        return null;
-    }
-
-    @Override
-    public <T> T accept(QueryPlanVisitor<T> visitor) {
-        return visitor.visit(this);
-    }
-
-    public QueryPlan getRhsPlan() {
-        return rhs;
-    }
-
-    @Override
-    public Cost getCost() {
-        Double lhsByteCount = delegate.accept(new ByteCountVisitor());
-        Double rhsRowCount = rhs.accept(new RowCountVisitor());
-
-        if (lhsByteCount == null || rhsRowCount == null) {
-            return Cost.UNKNOWN;
-        }
-
-        Cost cost = new Cost(0, 0, lhsByteCount * rhsRowCount);
-        Cost lhsCost = delegate.getCost();
-        return cost.plus(lhsCost).plus(rhs.getCost());
-    }
-
-    private class CorrelateResultIterator implements ResultIterator {
-        private final ValueBitSet destBitSet = ValueBitSet.newInstance(joinedSchema);
-        private final ValueBitSet lhsBitSet = ValueBitSet.newInstance(lhsSchema);
-        private final ValueBitSet rhsBitSet =
-                (joinType == JoinType.Semi || joinType == JoinType.Anti) ?
-                        ValueBitSet.EMPTY_VALUE_BITSET
-                        : ValueBitSet.newInstance(rhsSchema);
-        private final ResultIterator iter;
-        private ResultIterator rhsIter = null;
-        private Tuple current = null;
-        private boolean closed = false;
-
-        private CorrelateResultIterator(ParallelScanGrouper scanGrouper, Scan scan)
-                throws SQLException {
-            iter = delegate.iterator(scanGrouper, scan);
-        }
-
-        @Override
-        public void close() throws SQLException {
-            if (!closed) {
-                closed = true;
-                iter.close();
-                if (rhsIter != null) {
-                    rhsIter.close();
-                }
-            }
-        }
-
-        @Override
-        public Tuple next() throws SQLException {
-            if (closed)
-                return null;
-
-            Tuple rhsCurrent = null;
-            if (rhsIter != null) {
-                rhsCurrent = rhsIter.next();
-                if (rhsCurrent == null) {
-                    rhsIter.close();
-                    rhsIter = null;
-                } else if (isSingleValueOnly) {
-                    throw new SQLExceptionInfo.Builder(SQLExceptionCode.SINGLE_ROW_SUBQUERY_RETURNS_MULTIPLE_ROWS)
-                            .build().buildException();
-                }
-            }
-            while (rhsIter == null) {
-                current = iter.next();
-                if (current == null) {
-                    close();
-                    return null;
-                }
-                runtimeContext.setCorrelateVariableValue(variableId, current);
-                rhsIter = rhs.iterator();
-                rhsCurrent = rhsIter.next();
-                if ((rhsCurrent == null && (joinType == JoinType.Inner || joinType == JoinType.Semi))
-                        || (rhsCurrent != null && joinType == JoinType.Anti)) {
-                    rhsIter.close();
-                    rhsIter = null;
-                }
-            }
-
-            Tuple joined;
-            try {
-                joined = rhsBitSet == ValueBitSet.EMPTY_VALUE_BITSET ?
-                        current : TupleProjector.mergeProjectedValue(
-                        convertLhs(current), destBitSet,
-                        rhsCurrent, rhsBitSet, rhsFieldPosition, true);
-            } catch (IOException e) {
-                throw new SQLException(e);
-            }
-
-            if ((joinType == JoinType.Semi || rhsCurrent == null) && rhsIter != null) {
-                rhsIter.close();
-                rhsIter = null;
-            }
-
-            return joined;
-        }
-
-        @Override
-        public void explain(List<String> planSteps) { }
-
-        @Override
-        public void explain(List<String> planSteps,
-                ExplainPlanAttributesBuilder explainPlanAttributesBuilder) {
-        }
-
-        private ProjectedValueTuple convertLhs(Tuple lhs) throws IOException {
-            ProjectedValueTuple tuple;
-            if (lhs instanceof ProjectedValueTuple) {
-                tuple = (ProjectedValueTuple) lhs;
-            } else {
-                ImmutableBytesWritable ptr = getContext().getTempPtr();
-                TupleProjector.decodeProjectedValue(lhs, ptr);
-                lhsBitSet.clear();
-                lhsBitSet.or(ptr);
-                int bitSetLen = lhsBitSet.getEstimatedLength();
-                tuple = new ProjectedValueTuple(lhs, lhs.getValue(0).getTimestamp(),
-                        ptr.get(), ptr.getOffset(), ptr.getLength(), bitSetLen);
-
-            }
-            return tuple;
-        }
-    }
-
-}
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/AvgRowWidthVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/AvgRowWidthVisitor.java
index 9525747..ada8664 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/AvgRowWidthVisitor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/AvgRowWidthVisitor.java
@@ -23,7 +23,6 @@ import org.apache.phoenix.compile.TraceQueryPlan;
 import org.apache.phoenix.execute.AggregatePlan;
 import org.apache.phoenix.execute.ClientAggregatePlan;
 import org.apache.phoenix.execute.ClientScanPlan;
-import org.apache.phoenix.execute.CorrelatePlan;
 import org.apache.phoenix.execute.CursorFetchPlan;
 import org.apache.phoenix.execute.HashJoinPlan;
 import org.apache.phoenix.execute.LiteralResultIterationPlan;
@@ -156,11 +155,6 @@ public class AvgRowWidthVisitor implements QueryPlanVisitor<Double> {
     }
 
     @Override
-    public Double visit(CorrelatePlan plan) {
-        return plan.getDelegate().accept(this);
-    }
-
-    @Override
     public Double visit(CursorFetchPlan plan) {
         return plan.getDelegate().accept(this);
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/ByteCountVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/ByteCountVisitor.java
index 61a2895..7e9e88b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/ByteCountVisitor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/ByteCountVisitor.java
@@ -23,7 +23,6 @@ import org.apache.phoenix.compile.TraceQueryPlan;
 import org.apache.phoenix.execute.AggregatePlan;
 import org.apache.phoenix.execute.ClientAggregatePlan;
 import org.apache.phoenix.execute.ClientScanPlan;
-import org.apache.phoenix.execute.CorrelatePlan;
 import org.apache.phoenix.execute.CursorFetchPlan;
 import org.apache.phoenix.execute.HashJoinPlan;
 import org.apache.phoenix.execute.LiteralResultIterationPlan;
@@ -94,11 +93,6 @@ public class ByteCountVisitor implements QueryPlanVisitor<Double> {
     }
 
     @Override
-    public Double visit(CorrelatePlan plan) {
-        return getByteCountFromRowCountAndRowWidth(plan);
-    }
-
-    @Override
     public Double visit(CursorFetchPlan plan) {
         return getByteCountFromRowCountAndRowWidth(plan);
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/QueryPlanVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/QueryPlanVisitor.java
index a7ae3af..9229f9f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/QueryPlanVisitor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/QueryPlanVisitor.java
@@ -39,7 +39,6 @@ public interface QueryPlanVisitor<E> {
     E visit(SortMergeJoinPlan plan);
     E visit(UnionPlan plan);
     E visit(UnnestArrayPlan plan);
-    E visit(CorrelatePlan plan);
     E visit(CursorFetchPlan plan);
     E visit(ListJarsQueryPlan plan);
     E visit(TraceQueryPlan plan);
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/RowCountVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/RowCountVisitor.java
index 58ceea9..81c39a2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/RowCountVisitor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/visitor/RowCountVisitor.java
@@ -26,7 +26,6 @@ import org.apache.phoenix.compile.TraceQueryPlan;
 import org.apache.phoenix.execute.AggregatePlan;
 import org.apache.phoenix.execute.ClientAggregatePlan;
 import org.apache.phoenix.execute.ClientScanPlan;
-import org.apache.phoenix.execute.CorrelatePlan;
 import org.apache.phoenix.execute.CursorFetchPlan;
 import org.apache.phoenix.execute.HashJoinPlan;
 import org.apache.phoenix.execute.LiteralResultIterationPlan;
@@ -206,16 +205,6 @@ public class RowCountVisitor implements QueryPlanVisitor<Double> {
     }
 
     @Override
-    public Double visit(CorrelatePlan plan) {
-        Double lhsRows = plan.getDelegate().accept(this);
-        if (lhsRows != null) {
-            return lhsRows * SEMI_OR_ANTI_JOIN_FACTOR;
-        }
-
-        return null;
-    }
-
-    @Override
     public Double visit(CursorFetchPlan plan) {
         return plan.getDelegate().accept(this);
     }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index 16f9489..17c369d 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -59,7 +59,6 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.execute.AggregatePlan;
 import org.apache.phoenix.execute.ClientAggregatePlan;
 import org.apache.phoenix.execute.ClientScanPlan;
-import org.apache.phoenix.execute.CorrelatePlan;
 import org.apache.phoenix.execute.CursorFetchPlan;
 import org.apache.phoenix.execute.HashJoinPlan;
 import org.apache.phoenix.execute.HashJoinPlan.HashSubPlan;
@@ -5240,11 +5239,6 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest {
         }
 
         @Override
-        public List<QueryPlan> visit(CorrelatePlan plan) {
-            return Collections.emptyList();
-        }
-
-        @Override
         public List<QueryPlan> visit(CursorFetchPlan plan) {
             return Collections.emptyList();
         }
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java b/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
deleted file mode 100644
index 0531cf8..0000000
--- a/phoenix-core/src/test/java/org/apache/phoenix/execute/CorrelatePlanTest.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.execute;
-
-import static org.apache.phoenix.query.QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT;
-import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY;
-import static org.apache.phoenix.util.PhoenixRuntime.CONNECTIONLESS;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
-import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.phoenix.thirdparty.com.google.common.collect.ImmutableList;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.phoenix.compile.ColumnResolver;
-import org.apache.phoenix.compile.FromCompiler;
-import org.apache.phoenix.compile.JoinCompiler;
-import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
-import org.apache.phoenix.compile.QueryPlan;
-import org.apache.phoenix.compile.RowProjector;
-import org.apache.phoenix.compile.SequenceManager;
-import org.apache.phoenix.compile.StatementContext;
-import org.apache.phoenix.compile.TupleProjectionCompiler;
-import org.apache.phoenix.coprocessor.MetaDataProtocol;
-import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.expression.ComparisonExpression;
-import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression;
-import org.apache.phoenix.expression.Expression;
-import org.apache.phoenix.expression.LiteralExpression;
-import org.apache.phoenix.expression.ProjectedColumnExpression;
-import org.apache.phoenix.iterate.ResultIterator;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixStatement;
-import org.apache.phoenix.parse.JoinTableNode.JoinType;
-import org.apache.phoenix.parse.ParseNodeFactory;
-import org.apache.phoenix.parse.SelectStatement;
-import org.apache.phoenix.schema.ColumnRef;
-import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.PColumnImpl;
-import org.apache.phoenix.schema.PName;
-import org.apache.phoenix.schema.PNameFactory;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.PTable.EncodedCQCounter;
-import org.apache.phoenix.schema.PTable.ImmutableStorageScheme;
-import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
-import org.apache.phoenix.schema.PTableImpl;
-import org.apache.phoenix.schema.PTableType;
-import org.apache.phoenix.schema.TableRef;
-import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
-import org.apache.phoenix.schema.tuple.Tuple;
-import org.junit.Test;
-
-import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
-
-public class CorrelatePlanTest {
-    
-    private static final StatementContext CONTEXT;
-    static {
-        try {
-            PhoenixConnection connection = DriverManager.getConnection(JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + CONNECTIONLESS).unwrap(PhoenixConnection.class);
-            PhoenixStatement stmt = new PhoenixStatement(connection);
-            ColumnResolver resolver = FromCompiler.getResolverForQuery(SelectStatement.SELECT_ONE, connection);
-            CONTEXT = new StatementContext(stmt, resolver, new Scan(), new SequenceManager(stmt));
-        } catch (SQLException e) {
-            throw new RuntimeException(e);
-        }
-    }
-    
-    private static final Object[][] LEFT_RELATION = new Object[][] {
-            {1, "1"},
-            {2, "2"},
-            {3, "3"},
-            {4, "4"},
-            {5, "5"},
-    };
-    
-    private static final Object[][] RIGHT_RELATION = new Object[][] {
-            {"2", 20},
-            {"2", 40},
-            {"5", 50},
-            {"6", 60},
-            {"5", 100},
-            {"1", 10},
-            {"3", 30},
-    };        
-    
-    @Test
-    public void testCorrelatePlanWithInnerJoinType() throws SQLException {
-        Object[][] expected = new Object[][] {
-                {1, "1", "1", 10},
-                {2, "2", "2", 20},
-                {2, "2", "2", 40},
-                {3, "3", "3", 30},
-                {5, "5", "5", 50},
-                {5, "5", "5", 100},
-        };
-        testCorrelatePlan(LEFT_RELATION, RIGHT_RELATION, 1, 0, JoinType.Inner, expected);
-    }
-    
-    @Test
-    public void testCorrelatePlanWithLeftJoinType() throws SQLException {
-        Object[][] expected = new Object[][] {
-                {1, "1", "1", 10},
-                {2, "2", "2", 20},
-                {2, "2", "2", 40},
-                {3, "3", "3", 30},
-                {4, "4", null, null},
-                {5, "5", "5", 50},
-                {5, "5", "5", 100},
-        };
-        testCorrelatePlan(LEFT_RELATION, RIGHT_RELATION, 1, 0, JoinType.Left, expected);
-    }
-    
-    @Test
-    public void testCorrelatePlanWithSemiJoinType() throws SQLException {
-        Object[][] expected = new Object[][] {
-                {1, "1"},
-                {2, "2"},
-                {3, "3"},
-                {5, "5"},
-        };
-        testCorrelatePlan(LEFT_RELATION, RIGHT_RELATION, 1, 0, JoinType.Semi, expected);
-    }
-    
-    @Test
-    public void testCorrelatePlanWithAntiJoinType() throws SQLException {
-        Object[][] expected = new Object[][] {
-                {4, "4"},
-        };
-        testCorrelatePlan(LEFT_RELATION, RIGHT_RELATION, 1, 0, JoinType.Anti, expected);
-    }
-    
-    @Test
-    public void testCorrelatePlanWithSingleValueOnly() throws SQLException {
-        Object[][] expected = new Object[][] {
-                {1, "1", "1", 10},
-                {2, "2", "2", 20},
-                {2, "2", "2", 40},
-        };
-        try {
-            testCorrelatePlan(LEFT_RELATION, RIGHT_RELATION, 1, 0, JoinType.Inner, expected);
-        } catch (SQLException e) {
-            assertEquals(SQLExceptionCode.SINGLE_ROW_SUBQUERY_RETURNS_MULTIPLE_ROWS.getErrorCode(), e.getErrorCode());
-        }
-        
-        Object[][] rightRelation = new Object[][] {
-                {"2", 20},
-                {"6", 60},
-                {"5", 100},
-                {"1", 10},
-        };        
-        expected = new Object[][] {
-                {1, "1", "1", 10},
-                {2, "2", "2", 20},
-                {5, "5", "5", 100},
-        };
-        testCorrelatePlan(LEFT_RELATION, rightRelation, 1, 0, JoinType.Inner, expected);
-    }
-    
-    @Test
-    public void testCorrelatePlanWithSingleValueOnlyAndOffset() throws SQLException {
-        Integer offset = 1;
-        Object[][] rightRelation = new Object[][] {
-                {"6", 60},
-                {"2", 20},
-                {"5", 100},
-                {"1", 10},
-        };
-        Object[][] expected = new Object[][] {
-                {2, "2", "2", 20},
-                {5, "5", "5", 100},
-        };
-        testCorrelatePlan(LEFT_RELATION, rightRelation, 1, 0, JoinType.Inner, expected, offset);
-    }
-
-    private void testCorrelatePlan(Object[][] leftRelation, Object[][] rightRelation, int leftCorrelColumn, int rightCorrelColumn, JoinType type, Object[][] expectedResult) throws SQLException {
-        testCorrelatePlan(leftRelation, rightRelation, leftCorrelColumn, rightCorrelColumn, type, expectedResult, null);
-    }
-
-    private void testCorrelatePlan(Object[][] leftRelation, Object[][] rightRelation, int leftCorrelColumn,
-            int rightCorrelColumn, JoinType type, Object[][] expectedResult, Integer offset) throws SQLException {
-        TableRef leftTable = createProjectedTableFromLiterals(leftRelation[0]);
-        TableRef rightTable = createProjectedTableFromLiterals(rightRelation[0]);
-        String varName = "$cor0";
-        RuntimeContext runtimeContext = new RuntimeContextImpl();
-        runtimeContext.defineCorrelateVariable(varName, leftTable);
-        QueryPlan leftPlan = newLiteralResultIterationPlan(leftRelation, offset);
-        QueryPlan rightPlan = newLiteralResultIterationPlan(rightRelation, offset);
-        Expression columnExpr = new ColumnRef(rightTable, rightCorrelColumn).newColumnExpression();
-        Expression fieldAccess = new CorrelateVariableFieldAccessExpression(runtimeContext, varName, new ColumnRef(leftTable, leftCorrelColumn).newColumnExpression());
-        Expression filter = ComparisonExpression.create(CompareOp.EQUAL, Arrays.asList(columnExpr, fieldAccess), CONTEXT.getTempPtr(), false);
-        rightPlan = new ClientScanPlan(CONTEXT, SelectStatement.SELECT_ONE, rightTable, RowProjector.EMPTY_PROJECTOR,
-                null, null, filter, OrderBy.EMPTY_ORDER_BY, rightPlan);
-        PTable joinedTable = JoinCompiler.joinProjectedTables(leftTable.getTable(), rightTable.getTable(), type);
-        CorrelatePlan correlatePlan = new CorrelatePlan(leftPlan, rightPlan, varName, type, false, runtimeContext, joinedTable, leftTable.getTable(), rightTable.getTable(), leftTable.getTable().getColumns().size());
-        ResultIterator iter = correlatePlan.iterator();
-        ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-        for (Object[] row : expectedResult) {
-            Tuple next = iter.next();
-            assertNotNull(next);
-            for (int i = 0; i < row.length; i++) {
-                PColumn column = joinedTable.getColumns().get(i);
-                boolean eval = new ProjectedColumnExpression(column, joinedTable, column.getName().getString()).evaluate(next, ptr);
-                Object o = eval ? column.getDataType().toObject(ptr) : null;
-                assertEquals(row[i], o);
-            }
-        }
-    }
-
-    private QueryPlan newLiteralResultIterationPlan(Object[][] rows, Integer offset) throws SQLException {
-        List<Tuple> tuples = Lists.newArrayList();
-        Tuple baseTuple = new SingleKeyValueTuple(KeyValue.LOWESTKEY);
-        for (Object[] row : rows) {
-            Expression[] exprs = new Expression[row.length];
-            for (int i = 0; i < row.length; i++) {
-                exprs[i] = LiteralExpression.newConstant(row[i]);
-            }
-            TupleProjector projector = new TupleProjector(exprs);
-            tuples.add(projector.projectResults(baseTuple));
-        }
-        
-        return new LiteralResultIterationPlan(tuples, CONTEXT, SelectStatement.SELECT_ONE, TableRef.EMPTY_TABLE_REF,
-                RowProjector.EMPTY_PROJECTOR, null, offset, OrderBy.EMPTY_ORDER_BY, null);
-    }
-
-
-    private TableRef createProjectedTableFromLiterals(Object[] row) {
-        List<PColumn> columns = Lists.<PColumn>newArrayList();
-        for (int i = 0; i < row.length; i++) {
-            String name = ParseNodeFactory.createTempAlias();
-            Expression expr = LiteralExpression.newConstant(row[i]);
-            PName colName = PNameFactory.newName(name);
-            columns.add(new PColumnImpl(PNameFactory.newName(name), PNameFactory.newName(VALUE_COLUMN_FAMILY),
-                    expr.getDataType(), expr.getMaxLength(), expr.getScale(), expr.isNullable(),
-                    i, expr.getSortOrder(), null, null, false, name, false, false, colName.getBytes(), HConstants.LATEST_TIMESTAMP));
-        }
-        try {
-            PTable pTable =  new PTableImpl.Builder()
-                    .setType(PTableType.SUBQUERY)
-                    .setTimeStamp(MetaDataProtocol.MIN_TABLE_TIMESTAMP)
-                    .setIndexDisableTimestamp(0L)
-                    .setSequenceNumber(PTable.INITIAL_SEQ_NUM)
-                    .setImmutableRows(false)
-                    .setDisableWAL(false)
-                    .setMultiTenant(false)
-                    .setStoreNulls(false)
-                    .setUpdateCacheFrequency(0)
-                    .setNamespaceMapped(Boolean.FALSE)
-                    .setAppendOnlySchema(false)
-                    .setImmutableStorageScheme(ImmutableStorageScheme.ONE_CELL_PER_COLUMN)
-                    .setQualifierEncodingScheme(QualifierEncodingScheme.NON_ENCODED_QUALIFIERS)
-                    .setBaseColumnCount(BASE_TABLE_BASE_COLUMN_COUNT)
-                    .setEncodedCQCounter(EncodedCQCounter.NULL_COUNTER)
-                    .setUseStatsForParallelization(true)
-                    .setExcludedColumns(ImmutableList.of())
-                    .setSchemaName(PName.EMPTY_NAME)
-                    .setTableName(PName.EMPTY_NAME)
-                    .setRowKeyOrderOptimizable(true)
-                    .setIndexes(Collections.emptyList())
-                    .setPhysicalNames(ImmutableList.of())
-                    .setColumns(columns)
-                    .build();
-            TableRef sourceTable = new TableRef(pTable);
-            List<ColumnRef> sourceColumnRefs = Lists.<ColumnRef> newArrayList();
-            for (PColumn column : sourceTable.getTable().getColumns()) {
-                sourceColumnRefs.add(new ColumnRef(sourceTable, column.getPosition()));
-            }
-        
-            return new TableRef(TupleProjectionCompiler.createProjectedTable(sourceTable, sourceColumnRefs, false));
-        } catch (SQLException e) {
-            throw new RuntimeException(e);
-        }        
-    }
-}