You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by vl...@apache.org on 2014/12/12 08:27:02 UTC

[1/3] incubator-calcite git commit: [CALCITE-483][CALCITE-489] Update Correlate mechanics and implement EnumerableCorrelate (aka nested loops join)

Repository: incubator-calcite
Updated Branches:
  refs/heads/master 8e196a41c -> 696da1685


http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
----------------------------------------------------------------------
diff --git a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
index 4958495..cba8924 100644
--- a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
+++ b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
@@ -290,13 +290,13 @@ ProjectRel(EXPR$0=[_ISO-8859-1'a'], EXPR$1=[$SLICE($2)])
         <Resource name="plan">
             <![CDATA[
 LogicalProject(EXPR$0=['abc'], EXPR$1=[$SLICE($9)])
-  Correlator(condition=[true], joinType=[inner], correlations=[[var0=offset7, var1=offset5]])
+  LogicalCorrelate(correlation=[$cor0], joinType=[INNER], requiredColumns=[{5, 7}])
     LogicalTableScan(table=[[CATALOG, SALES, EMP]])
     Collect(field=[EXPR$0])
       LogicalUnion(all=[true])
         LogicalProject(EXPR$0=[$cor0.DEPTNO])
           LogicalValues(tuples=[[{ 0 }]])
-        LogicalProject(EXPR$0=[$cor1.SAL])
+        LogicalProject(EXPR$0=[$cor0.SAL])
           LogicalValues(tuples=[[{ 0 }]])
 ]]>
         </Resource>
@@ -321,7 +321,7 @@ ProjectRel(EXPR$0=[_ISO-8859-1'abc'], EXPR$1=[$SLICE($8)])
         <Resource name="plan">
             <![CDATA[
 LogicalProject(DEPTNO=[$0], NAME=[$1], EMPSET=[$2])
-  Correlator(condition=[true], joinType=[inner], correlations=[[var0=offset0]])
+  LogicalCorrelate(correlation=[$cor0], joinType=[INNER], requiredColumns=[{0}])
     LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
     Collect(field=[EXPR$0])
       LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8])
@@ -379,7 +379,7 @@ ProjectRel(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5],
             <![CDATA[
 LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8])
   LogicalFilter(condition=[IS NOT NULL($9)])
-    Correlator(condition=[true], joinType=[left], correlations=[[var0=offset7]])
+    LogicalCorrelate(correlation=[$cor0], joinType=[LEFT], requiredColumns=[{7}])
       LogicalTableScan(table=[[CATALOG, SALES, EMP]])
       LogicalAggregate(group=[{}], agg#0=[MIN($0)])
         LogicalProject($f0=[true])
@@ -410,7 +410,7 @@ ProjectRel(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5],
 LogicalProject(EXPR$0=[$0])
   Uncollect
     LogicalProject(EXPR$0=[$SLICE($2)])
-      Correlator(condition=[true], joinType=[inner], correlations=[[var0=offset0]])
+      LogicalCorrelate(correlation=[$cor0], joinType=[INNER], requiredColumns=[{0}])
         LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
         Collect(field=[EXPR$0])
           LogicalUnion(all=[true])
@@ -440,7 +440,7 @@ ProjectRel(EXPR$0=[$0])
         <Resource name="plan">
             <![CDATA[
 LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8], DEPTNO0=[$9], NAME=[$10])
-  Correlator(condition=[true], joinType=[inner], correlations=[[var0=offset7]])
+  LogicalCorrelate(correlation=[$cor0], joinType=[INNER], requiredColumns=[{7}])
     LogicalTableScan(table=[[CATALOG, SALES, EMP]])
     LogicalProject(DEPTNO=[$0], NAME=[$1])
       LogicalFilter(condition=[=($cor0.DEPTNO, $0)])
@@ -1595,14 +1595,14 @@ where exists (
             <![CDATA[
 LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8])
   LogicalFilter(condition=[IS NOT NULL($9)])
-    Correlator(condition=[true], joinType=[left], correlations=[[var1=offset7, var0=offset7]])
+    LogicalCorrelate(correlation=[$cor1], joinType=[LEFT], requiredColumns=[{7}])
       LogicalTableScan(table=[[CATALOG, SALES, EMP]])
       LogicalAggregate(group=[{}], agg#0=[MIN($0)])
         LogicalProject($f0=[true])
           LogicalProject(EXPR$0=[1])
             LogicalFilter(condition=[<=($0, $cor1.DEPTNO)])
               LogicalProject(DEPTNO=[$0], NAME=[$1])
-                LogicalFilter(condition=[>=($0, $cor0.DEPTNO)])
+                LogicalFilter(condition=[>=($0, $cor1.DEPTNO)])
                   LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
 ]]>
         </Resource>
@@ -1688,6 +1688,84 @@ LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$
 ]]>
         </Resource>
     </TestCase>
+    <TestCase name="testNestedCorrelations">
+        <Resource name="sql">
+            <![CDATA[select * from (select 2+deptno d2, 3+deptno d3 from emp) e
+ where exists (select 1 from (select deptno+1 d1 from dept) d
+ where d1=e.d2 and exists (select 2 from (select deptno+4 d4, deptno+5 d5, deptno+6 d6 from dept)
+ where d4=d.d1 and d5=d.d1 and d6=e.d3)))]]>
+        </Resource>
+        <Resource name="plan">
+            <![CDATA[
+LogicalProject(D2=[$0], D3=[$1])
+  LogicalFilter(condition=[IS NOT NULL($2)])
+    LogicalCorrelate(correlation=[$cor3], joinType=[LEFT], requiredColumns=[{0, 1}])
+      LogicalProject(D2=[+(2, $7)], D3=[+(3, $7)])
+        LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+      LogicalAggregate(group=[{}], agg#0=[MIN($0)])
+        LogicalProject($f0=[true])
+          LogicalProject(EXPR$0=[1])
+            LogicalFilter(condition=[AND(=($0, $cor3.D2), IS NOT NULL($1))])
+              LogicalCorrelate(correlation=[$cor0], joinType=[LEFT], requiredColumns=[{0}])
+                LogicalProject(D1=[+($0, 1)])
+                  LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+                LogicalAggregate(group=[{}], agg#0=[MIN($0)])
+                  LogicalProject($f0=[true])
+                    LogicalProject(EXPR$0=[2])
+                      LogicalFilter(condition=[AND(=($0, $cor0.D1), =($1, $cor0.D1), =($2, $cor3.D3))])
+                        LogicalProject(D4=[+($0, 4)], D5=[+($0, 5)], D6=[+($0, 6)])
+                          LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+]]>
+        </Resource>
+    </TestCase>
+    <TestCase name="testNestedCorrelationsDecorrelated">
+        <Resource name="sql">
+            <![CDATA[select * from (select 2+deptno d2, 3+deptno d3 from emp) e
+ where exists (select 1 from (select deptno+1 d1 from dept) d
+ where d1=e.d2 and exists (select 2 from (select deptno+4 d4, deptno+5 d5, deptno+6 d6 from dept)
+ where d4=d.d1 and d5=d.d1 and d6=e.d3)))]]>
+        </Resource>
+        <Resource name="plan">
+            <![CDATA[
+LogicalProject(D2=[$0], D3=[$1])
+  LogicalFilter(condition=[IS NOT NULL($2)])
+    LogicalProject(D2=[$0], D3=[$1], $f0=[$4])
+      LogicalJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[left])
+        LogicalProject(D2=[+(2, $7)], D3=[+(3, $7)])
+          LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+        LogicalAggregate(group=[{0, 1}], agg#0=[MIN($2)])
+          LogicalProject($f00=[$1], $f03=[$2], $f0=[$0])
+            LogicalProject($f0=[true], $f00=[$1], $f03=[$2])
+              LogicalProject(EXPR$0=[1], $f00=[$3], $f03=[$2])
+                LogicalFilter(condition=[AND(=($0, $3), IS NOT NULL($1))])
+                  LogicalJoin(condition=[true], joinType=[inner])
+                    LogicalProject(D1=[$0], $f0=[$4], $f03=[$3])
+                      LogicalJoin(condition=[AND(=($0, $1), =($0, $2))], joinType=[left])
+                        LogicalProject(D1=[+($0, 1)])
+                          LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+                        LogicalAggregate(group=[{0, 1, 2}], agg#0=[MIN($3)])
+                          LogicalProject(D1=[$1], D12=[$2], $f03=[$3], $f0=[$0])
+                            LogicalProject($f0=[true], D1=[$1], D12=[$2], $f03=[$3])
+                              LogicalProject(EXPR$0=[2], D1=[$3], D12=[$3], $f0=[$4])
+                                LogicalFilter(condition=[AND(=($0, $3), =($1, $3), =($2, $4))])
+                                  LogicalJoin(condition=[true], joinType=[inner])
+                                    LogicalProject(D4=[+($0, 4)], D5=[+($0, 5)], D6=[+($0, 6)])
+                                      LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+                                    LogicalJoin(condition=[true], joinType=[inner])
+                                      LogicalAggregate(group=[{0}])
+                                        LogicalProject(D1=[+($0, 1)])
+                                          LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+                                      LogicalAggregate(group=[{0}])
+                                        LogicalProject($f0=[$1])
+                                          LogicalProject(D2=[+(2, $7)], D3=[+(3, $7)])
+                                            LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+                    LogicalAggregate(group=[{0}])
+                      LogicalProject($f0=[$0])
+                        LogicalProject(D2=[+(2, $7)], D3=[+(3, $7)])
+                          LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+        </Resource>
+    </TestCase>
     <TestCase name="testWithInsideWhereExistsDecorrelate">
         <Resource name="sql">
             <![CDATA[select * from emp
@@ -1700,12 +1778,12 @@ where exists (
 LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8])
   LogicalFilter(condition=[IS NOT NULL($9)])
     LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8], $f0=[$11])
-      LogicalJoin(condition=[AND(=($7, $10), =($7, $9))], joinType=[left])
+      LogicalJoin(condition=[AND(=($7, $9), =($7, $10))], joinType=[left])
         LogicalTableScan(table=[[CATALOG, SALES, EMP]])
         LogicalAggregate(group=[{0, 1}], agg#0=[MIN($2)])
-          LogicalProject($f01=[$1], $f00=[$2], $f0=[$0])
-            LogicalProject($f0=[true], $f01=[$1], $f00=[$2])
-              LogicalProject(EXPR$0=[1], $f0=[$2], $f00=[$3])
+          LogicalProject($f00=[$1], $f02=[$2], $f0=[$0])
+            LogicalProject($f0=[true], $f00=[$1], $f02=[$2])
+              LogicalProject(EXPR$0=[1], $f00=[$3], $f0=[$2])
                 LogicalFilter(condition=[<=($0, $3)])
                   LogicalJoin(condition=[true], joinType=[inner])
                     LogicalProject(DEPTNO=[$0], NAME=[$1], $f0=[$2])
@@ -1754,7 +1832,7 @@ LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$
             <![CDATA[
 LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8])
   LogicalFilter(condition=[IS NOT NULL($9)])
-    Correlator(condition=[true], joinType=[left], correlations=[[var0=offset7]])
+    LogicalCorrelate(correlation=[$cor0], joinType=[LEFT], requiredColumns=[{7}])
       LogicalTableScan(table=[[CATALOG, SALES, EMP]])
       LogicalAggregate(group=[{}], agg#0=[MIN($0)])
         LogicalProject($f0=[true])
@@ -1775,7 +1853,7 @@ LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$
 LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8])
   LogicalFilter(condition=[IS NOT NULL($9)])
     LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6], DEPTNO=[$7], SLACKER=[$8], $f0=[$9])
-      Correlator(condition=[true], joinType=[left], correlations=[[var0=offset7]])
+      LogicalCorrelate(correlation=[$cor0], joinType=[LEFT], requiredColumns=[{7}])
         LogicalTableScan(table=[[CATALOG, SALES, EMP]])
         LogicalAggregate(group=[{}], agg#0=[MIN($0)])
           LogicalProject($f0=[true])
@@ -1793,7 +1871,7 @@ LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$
         <Resource name="plan">
             <![CDATA[
 LogicalProject(DEPTNO=[$0], NAME=[$1], EXPR$0=[$2])
-  Correlator(condition=[true], joinType=[inner], correlations=[[var0=offset0]])
+  LogicalCorrelate(correlation=[$cor0], joinType=[INNER], requiredColumns=[{0}])
     LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
     Uncollect
       LogicalProject(EXPR$0=[$SLICE($0)])

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/linq4j/src/main/java/org/apache/calcite/linq4j/CorrelateJoinType.java
----------------------------------------------------------------------
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/CorrelateJoinType.java b/linq4j/src/main/java/org/apache/calcite/linq4j/CorrelateJoinType.java
new file mode 100644
index 0000000..322567e
--- /dev/null
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/CorrelateJoinType.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.linq4j;
+
+/**
+ * Specifies the type of correlation operation: inner, left, semi, or anti.
+ */
+public enum CorrelateJoinType {
+  /**
+   * Inner join
+   */
+  INNER,
+
+  /**
+   * Left-outer join
+   */
+  LEFT,
+
+  /**
+   * Semi-join
+   * <p>Similar to from A ... where a in (select b from B ...)</p>
+   */
+  SEMI,
+
+  /**
+   * Anti-join
+   * <p>Similar to from A ... where a NOT in (select b from B ...)</p>
+   * <p>Note: if B.b is nullable and B has nulls, no rows must be returned</p>
+   */
+  ANTI;
+}
+
+// End SemiJoinType.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/linq4j/src/main/java/org/apache/calcite/linq4j/DefaultEnumerable.java
----------------------------------------------------------------------
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/DefaultEnumerable.java b/linq4j/src/main/java/org/apache/calcite/linq4j/DefaultEnumerable.java
index a0853bf..c0c2b4c 100644
--- a/linq4j/src/main/java/org/apache/calcite/linq4j/DefaultEnumerable.java
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/DefaultEnumerable.java
@@ -370,6 +370,13 @@ public abstract class DefaultEnumerable<T> implements OrderedEnumerable<T> {
         generateNullsOnRight);
   }
 
+  public <TInner, TResult> Enumerable<TResult> correlateJoin(
+      CorrelateJoinType joinType, Function1<T, Enumerable<TInner>> inner,
+      Function2<T, TInner, TResult> resultSelector) {
+    return EnumerableDefaults.correlateJoin(joinType, getThis(), inner,
+        resultSelector);
+  }
+
   public T last() {
     return EnumerableDefaults.last(getThis());
   }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
----------------------------------------------------------------------
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java b/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
index b2f64a7..4be759c 100644
--- a/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
@@ -1033,6 +1033,105 @@ public abstract class EnumerableDefaults {
   }
 
   /**
+   * Returns elements of {@code outer} for which there is a member of
+   * {@code inner} with a matching key. A specified
+   * {@code EqualityComparer<TSource>} is used to compare keys.
+   */
+  public static <TSource, TInner, TResult> Enumerable<TResult> correlateJoin(
+      final CorrelateJoinType joinType, final Enumerable<TSource> outer,
+      final Function1<TSource, Enumerable<TInner>> inner,
+      final Function2<TSource, TInner, TResult> resultSelector) {
+    return new AbstractEnumerable<TResult>() {
+      public Enumerator<TResult> enumerator() {
+        return new Enumerator<TResult>() {
+          private Enumerator<TSource> outerEnumerator = outer.enumerator();
+          private Enumerator<TInner> innerEnumerator;
+          TSource outerValue;
+          TInner innerValue;
+          int state = 0; // 0 -- moving outer, 1 moving inner;
+
+          public TResult current() {
+            return resultSelector.apply(outerValue, innerValue);
+          }
+
+          public boolean moveNext() {
+            while (true) {
+              switch (state) {
+              case 0:
+                // move outer
+                if (!outerEnumerator.moveNext()) {
+                  return false;
+                }
+                outerValue = outerEnumerator.current();
+                // initial move inner
+                Enumerable<TInner> innerEnumerable = inner.apply(outerValue);
+                if (innerEnumerable == null) {
+                  innerEnumerable = Linq4j.emptyEnumerable();
+                }
+                if (innerEnumerator != null) {
+                  innerEnumerator.close();
+                }
+                innerEnumerator = innerEnumerable.enumerator();
+                if (innerEnumerator.moveNext()) {
+                  switch (joinType) {
+                  case ANTI:
+                    // For anti-join need to try next outer row
+                    // Current does not match
+                    continue;
+                  case SEMI:
+                    return true; // current row matches
+                  }
+                  // INNER and LEFT just return result
+                  innerValue = innerEnumerator.current();
+                  state = 1; // iterate over inner results
+                  return true;
+                }
+                // No match detected
+                innerValue = null;
+                switch (joinType) {
+                case LEFT:
+                case ANTI:
+                  return true;
+                }
+                // For INNER and LEFT need to find another outer row
+                continue;
+              case 1:
+                // subsequent move inner
+                if (innerEnumerator.moveNext()) {
+                  innerValue = innerEnumerator.current();
+                  return true;
+                }
+                state = 0;
+                // continue loop, move outer
+              }
+            }
+          }
+
+          public void reset() {
+            state = 0;
+            outerEnumerator.reset();
+            closeInner();
+          }
+
+          public void close() {
+            outerEnumerator.close();
+            closeInner();
+            outerValue = null;
+          }
+
+          private void closeInner() {
+            innerValue = null;
+            if (innerEnumerator != null) {
+              innerEnumerator.close();
+              innerEnumerator = null;
+            }
+          }
+        };
+      }
+    };
+  }
+
+  /**
    * Returns the last element of a sequence. (Defined
    * by Enumerable.)
    */

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/linq4j/src/main/java/org/apache/calcite/linq4j/ExtendedEnumerable.java
----------------------------------------------------------------------
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/ExtendedEnumerable.java b/linq4j/src/main/java/org/apache/calcite/linq4j/ExtendedEnumerable.java
index f657dd7..a5c41c0 100644
--- a/linq4j/src/main/java/org/apache/calcite/linq4j/ExtendedEnumerable.java
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/ExtendedEnumerable.java
@@ -533,6 +533,19 @@ public interface ExtendedEnumerable<TSource> {
       boolean generateNullsOnLeft, boolean generateNullsOnRight);
 
   /**
+   * For each row of the current enumerable returns the correlated rows
+   * from the {@code inner} enumerable (nested loops join).
+   *
+   * @param joinType inner, left, semi or anti join type
+   * @param inner generator of inner enumerable
+   * @param resultSelector selector of the result. For semi/anti join
+   *                       inner argument is always null.
+   */
+  <TInner, TResult> Enumerable<TResult> correlateJoin(
+      CorrelateJoinType joinType, Function1<TSource, Enumerable<TInner>> inner,
+      Function2<TSource, TInner, TResult> resultSelector);
+
+  /**
    * Returns the last element of a sequence. (Defined
    * by Enumerable.)
    */

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/linq4j/src/test/java/org/apache/calcite/linq4j/test/CorrelateJoinTest.java
----------------------------------------------------------------------
diff --git a/linq4j/src/test/java/org/apache/calcite/linq4j/test/CorrelateJoinTest.java b/linq4j/src/test/java/org/apache/calcite/linq4j/test/CorrelateJoinTest.java
new file mode 100644
index 0000000..6c49453
--- /dev/null
+++ b/linq4j/src/test/java/org/apache/calcite/linq4j/test/CorrelateJoinTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.linq4j.test;
+
+import org.apache.calcite.linq4j.CorrelateJoinType;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.Linq4j;
+import org.apache.calcite.linq4j.function.Function1;
+import org.apache.calcite.linq4j.function.Function2;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * Tests {@link org.apache.calcite.linq4j.ExtendedEnumerable#correlateJoin}
+ */
+public class CorrelateJoinTest {
+  static final Function2<Integer, Integer, Integer[]> SELECT_BOTH =
+      new Function2<Integer, Integer, Integer[]>() {
+        public Integer[] apply(Integer v0, Integer v1) {
+          return new Integer[]{v0, v1};
+        }
+      };
+
+  @Test public void testInner() {
+    testJoin(CorrelateJoinType.INNER, new Integer[][]{
+      {2, 20},
+      {3, -30},
+      {3, -60},
+      {20, 200},
+      {30, -300},
+      {30, -600}});
+  }
+
+  @Test public void testLeft() {
+    testJoin(CorrelateJoinType.LEFT, new Integer[][]{
+      {1, null},
+      {2, 20},
+      {3, -30},
+      {3, -60},
+      {10, null},
+      {20, 200},
+      {30, -300},
+      {30, -600}});
+  }
+
+  @Test public void testSemi() {
+    testJoin(CorrelateJoinType.SEMI, new Integer[][]{
+      {2, null},
+      {3, null},
+      {20, null},
+      {30, null}});
+  }
+
+  @Test public void testAnti() {
+    testJoin(CorrelateJoinType.ANTI, new Integer[][]{
+      {1, null},
+      {10, null}});
+  }
+
+  public void testJoin(CorrelateJoinType joinType, Integer[][] expected) {
+    Enumerable<Integer[]> join =
+        Linq4j.asEnumerable(ImmutableList.of(1, 2, 3, 10, 20, 30))
+            .correlateJoin(joinType,
+                new Function1<Integer, Enumerable<Integer>>() {
+                  public Enumerable<Integer> apply(Integer a0) {
+                    if (a0 == 1 || a0 == 10) {
+                      return Linq4j.emptyEnumerable();
+                    }
+                    if (a0 == 2 || a0 == 20) {
+                      return Linq4j.singletonEnumerable(a0 * 10);
+                    }
+                    if (a0 == 3 || a0 == 30) {
+                      return Linq4j.asEnumerable(
+                          ImmutableList.of(-a0 * 10, -a0 * 20));
+                    }
+                    throw new IllegalArgumentException(
+                        "Unexpected input " + a0);
+                  }
+                }, SELECT_BOTH);
+    for (int i = 0; i < 2; i++) {
+      Enumerator<Integer[]> e = join.enumerator();
+      checkResults(e, expected);
+      e.close();
+    }
+  }
+
+  private void checkResults(Enumerator<Integer[]> e, Integer[][] expected) {
+    List<Integer[]> res = Lists.newArrayList();
+    while (e.moveNext()) {
+      res.add(e.current());
+    }
+    Integer[][] actual = res.toArray(new Integer[res.size()][]);
+    assertArrayEquals(expected, actual);
+  }
+}
+
+// End CorrelateJoinTest.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/linq4j/src/test/java/org/apache/calcite/linq4j/test/Linq4jSuite.java
----------------------------------------------------------------------
diff --git a/linq4j/src/test/java/org/apache/calcite/linq4j/test/Linq4jSuite.java b/linq4j/src/test/java/org/apache/calcite/linq4j/test/Linq4jSuite.java
index fd044cb..b1dd7a3 100644
--- a/linq4j/src/test/java/org/apache/calcite/linq4j/test/Linq4jSuite.java
+++ b/linq4j/src/test/java/org/apache/calcite/linq4j/test/Linq4jSuite.java
@@ -36,7 +36,8 @@ import org.junit.runners.Suite;
     DeterministicTest.class,
     BlockBuilderTest.class,
     FunctionTest.class,
-    TypeTest.class
+    TypeTest.class,
+    CorrelateJoinTest.class
 })
 public class Linq4jSuite {
 }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/spark/src/main/java/org/apache/calcite/adapter/spark/SparkRules.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/calcite/adapter/spark/SparkRules.java b/spark/src/main/java/org/apache/calcite/adapter/spark/SparkRules.java
index e95dc50..0d89f6f 100644
--- a/spark/src/main/java/org/apache/calcite/adapter/spark/SparkRules.java
+++ b/spark/src/main/java/org/apache/calcite/adapter/spark/SparkRules.java
@@ -343,7 +343,7 @@ public abstract class SparkRules {
                 builder2,
                 new RexToLixTranslator.InputGetterImpl(
                     Collections.singletonList(
-                        Pair.of((Expression) e_, result.physType))));
+                        Pair.of((Expression) e_, result.physType))), null);
         builder2.add(
             Expressions.ifThen(
                 Expressions.not(condition),
@@ -360,7 +360,7 @@ public abstract class SparkRules {
               null,
               new RexToLixTranslator.InputGetterImpl(
                   Collections.singletonList(
-                      Pair.of((Expression) e_, result.physType))));
+                      Pair.of((Expression) e_, result.physType))), null);
       builder2.add(
           Expressions.return_(null,
               Expressions.convert_(

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/spark/src/main/java/org/apache/calcite/adapter/spark/SparkToEnumerableConverter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/calcite/adapter/spark/SparkToEnumerableConverter.java b/spark/src/main/java/org/apache/calcite/adapter/spark/SparkToEnumerableConverter.java
index 0e572f1..e270d44 100644
--- a/spark/src/main/java/org/apache/calcite/adapter/spark/SparkToEnumerableConverter.java
+++ b/spark/src/main/java/org/apache/calcite/adapter/spark/SparkToEnumerableConverter.java
@@ -107,7 +107,6 @@ public class SparkToEnumerableConverter
       if (parent != null) {
         assert input == parent.getInputs().get(ordinal);
       }
-      createFrame(parent, ordinal, input);
       return input.implementSpark(this);
     }
 


[3/3] incubator-calcite git commit: [CALCITE-483][CALCITE-489] Update Correlate mechanics and implement EnumerableCorrelate (aka nested loops join)

Posted by vl...@apache.org.
[CALCITE-483][CALCITE-489] Update Correlate mechanics and implement EnumerableCorrelate (aka nested loops join)

Notable changes:
 * Correlation variable is produced by Correlate relation for its left input
 * In the right input, correlation is referred as RexFieldAccess(RexCorrelVariable)
 * De-correlation is still performed by default, however it can be disabled by forceDecorrelate=false connection property

fixes #20


Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/696da168
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/696da168
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/696da168

Branch: refs/heads/master
Commit: 696da1685f6c86b6e90abfbe369ea861deeb72d5
Parents: 8e196a4
Author: Vladimir Sitnikov <si...@gmail.com>
Authored: Fri Dec 12 10:20:09 2014 +0300
Committer: Vladimir Sitnikov <si...@gmail.com>
Committed: Fri Dec 12 10:21:49 2014 +0300

----------------------------------------------------------------------
 .../calcite/adapter/enumerable/EnumUtils.java   |  50 +++++
 .../adapter/enumerable/EnumerableCalc.java      |   6 +-
 .../adapter/enumerable/EnumerableCorrelate.java | 118 +++++++++++
 .../enumerable/EnumerableCorrelateRule.java     |  51 +++++
 .../adapter/enumerable/EnumerableJoin.java      |  43 +---
 .../enumerable/EnumerableRelImplementor.java    |  37 +++-
 .../adapter/enumerable/EnumerableRules.java     |   3 +
 .../adapter/enumerable/JavaRelImplementor.java  |  20 +-
 .../adapter/enumerable/RexToLixTranslator.java  |  65 +++++-
 .../calcite/config/CalciteConnectionConfig.java |   2 +
 .../config/CalciteConnectionConfigImpl.java     |   5 +
 .../config/CalciteConnectionProperty.java       |   4 +
 .../org/apache/calcite/plan/RelImplementor.java |  30 +--
 .../org/apache/calcite/plan/volcano/RelSet.java |   6 -
 .../calcite/prepare/CalciteMaterializer.java    |   6 +-
 .../calcite/prepare/CalcitePrepareImpl.java     |   1 +
 .../org/apache/calcite/prepare/Prepare.java     |   6 +-
 .../org/apache/calcite/rel/AbstractRelNode.java |  29 +--
 .../main/java/org/apache/calcite/rel/BiRel.java |  86 ++++++++
 .../apache/calcite/rel/RelImplementorImpl.java  | 166 ----------------
 .../java/org/apache/calcite/rel/RelNode.java    |  30 +--
 .../java/org/apache/calcite/rel/RelShuttle.java |   4 +-
 .../org/apache/calcite/rel/RelShuttleImpl.java  |   6 +-
 .../org/apache/calcite/rel/core/Correlate.java  | 197 +++++++++++++++++++
 .../apache/calcite/rel/core/Correlation.java    |  76 -------
 .../apache/calcite/rel/core/CorrelationId.java  |  91 +++++++++
 .../org/apache/calcite/rel/core/Correlator.java | 166 ----------------
 .../java/org/apache/calcite/rel/core/Join.java  |  47 +----
 .../org/apache/calcite/rel/core/Uncollect.java  |   3 +-
 .../apache/calcite/rel/externalize/RelJson.java |  13 +-
 .../calcite/rel/logical/LogicalCorrelate.java   | 104 ++++++++++
 .../rel/metadata/RelMdColumnUniqueness.java     |   4 +-
 .../calcite/rel/metadata/RelMdUniqueKeys.java   |   4 +-
 .../calcite/rel/rules/JoinToCorrelateRule.java  | 138 +++++++++++++
 .../calcite/rel/rules/JoinToCorrelatorRule.java | 129 ------------
 .../rel/rules/ProjectJoinTransposeRule.java     |   4 +
 .../apache/calcite/rel/rules/PushProjector.java |   3 +-
 .../org/apache/calcite/rex/RexExecutorImpl.java |   2 +-
 .../org/apache/calcite/sql/SemiJoinType.java    | 111 +++++++++++
 .../sql2rel/DeduplicateCorrelateVariables.java  | 107 ++++++++++
 .../apache/calcite/sql2rel/RelDecorrelator.java | 195 ++++++++++--------
 .../sql2rel/RelStructuredTypeFlattener.java     |  42 ++--
 .../calcite/sql2rel/SqlToRelConverter.java      | 135 ++++++++-----
 .../java/org/apache/calcite/tools/Programs.java |   1 +
 .../org/apache/calcite/util/BuiltInMethod.java  |   3 +
 .../apache/calcite/util/trace/CalciteTrace.java |   4 +-
 .../org/apache/calcite/test/CalciteSuite.java   |   2 +
 .../java/org/apache/calcite/test/JdbcTest.java  |   2 +-
 .../calcite/test/SqlToRelConverterTest.java     |  18 ++
 .../enumerable/EnumerableCorrelateTest.java     |  99 ++++++++++
 .../calcite/test/enumerable/package-info.java   |  23 +++
 .../calcite/test/SqlToRelConverterTest.xml      | 108 ++++++++--
 .../calcite/linq4j/CorrelateJoinType.java       |  47 +++++
 .../calcite/linq4j/DefaultEnumerable.java       |   7 +
 .../calcite/linq4j/EnumerableDefaults.java      |  99 ++++++++++
 .../calcite/linq4j/ExtendedEnumerable.java      |  13 ++
 .../calcite/linq4j/test/CorrelateJoinTest.java  | 119 +++++++++++
 .../apache/calcite/linq4j/test/Linq4jSuite.java |   3 +-
 .../calcite/adapter/spark/SparkRules.java       |   4 +-
 .../spark/SparkToEnumerableConverter.java       |   1 -
 60 files changed, 1971 insertions(+), 927 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
index e025c5a..d1948f1 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
@@ -17,11 +17,15 @@
 package org.apache.calcite.adapter.enumerable;
 
 import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.linq4j.function.Function2;
 import org.apache.calcite.linq4j.tree.BlockStatement;
 import org.apache.calcite.linq4j.tree.Expression;
 import org.apache.calcite.linq4j.tree.Expressions;
 import org.apache.calcite.linq4j.tree.MethodDeclaration;
 import org.apache.calcite.linq4j.tree.ParameterExpression;
+import org.apache.calcite.linq4j.tree.Primitive;
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexNode;
@@ -32,6 +36,7 @@ import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
 import java.lang.reflect.Type;
 import java.util.AbstractList;
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -110,6 +115,51 @@ public class EnumUtils {
       }
     };
   }
+
+  static Expression joinSelector(JoinRelType joinType, PhysType physType,
+      List<PhysType> inputPhysTypes) {
+    // A parameter for each input.
+    final List<ParameterExpression> parameters =
+        new ArrayList<ParameterExpression>();
+
+    // Generate all fields.
+    final List<Expression> expressions =
+        new ArrayList<Expression>();
+    final int outputFieldCount = physType.getRowType().getFieldCount();
+    for (Ord<PhysType> ord : Ord.zip(inputPhysTypes)) {
+      final PhysType inputPhysType =
+          ord.e.makeNullable(joinType.generatesNullsOn(ord.i));
+      // If input item is just a primitive, we do not generate specialized
+      // primitive apply override since it won't be called anyway
+      // Function<T> always operates on boxed arguments
+      final ParameterExpression parameter =
+          Expressions.parameter(Primitive.box(inputPhysType.getJavaRowType()),
+              EnumUtils.LEFT_RIGHT[ord.i]);
+      parameters.add(parameter);
+      if (expressions.size() == outputFieldCount) {
+        // For instance, if semi-join needs to return just the left inputs
+        break;
+      }
+      final int fieldCount = inputPhysType.getRowType().getFieldCount();
+      for (int i = 0; i < fieldCount; i++) {
+        Expression expression =
+            inputPhysType.fieldReference(parameter, i,
+                physType.getJavaFieldType(expressions.size()));
+        if (joinType.generatesNullsOn(ord.i)) {
+          expression =
+              Expressions.condition(
+                  Expressions.equal(parameter, Expressions.constant(null)),
+                  Expressions.constant(null),
+                  expression);
+        }
+        expressions.add(expression);
+      }
+    }
+    return Expressions.lambda(
+        Function2.class,
+        physType.record(expressions),
+        parameters);
+  }
 }
 
 // End EnumUtils.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableCalc.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableCalc.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableCalc.java
index 69afcb4..8a849fb 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableCalc.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableCalc.java
@@ -120,7 +120,8 @@ public class EnumerableCalc extends Calc implements EnumerableRel {
               builder2,
               new RexToLixTranslator.InputGetterImpl(
                   Collections.singletonList(
-                      Pair.of(input, result.physType))));
+                      Pair.of(input, result.physType))),
+              implementor.allCorrelateVariables);
       builder2.add(
           Expressions.ifThen(
               condition,
@@ -147,7 +148,8 @@ public class EnumerableCalc extends Calc implements EnumerableRel {
             physType,
             new RexToLixTranslator.InputGetterImpl(
                 Collections.singletonList(
-                    Pair.of(input, result.physType))));
+                    Pair.of(input, result.physType))),
+            implementor.allCorrelateVariables);
     builder3.add(
         Expressions.return_(
             null, physType.record(expressions)));

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableCorrelate.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableCorrelate.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableCorrelate.java
new file mode 100644
index 0000000..3e2f468
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableCorrelate.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.enumerable;
+
+import org.apache.calcite.linq4j.tree.BlockBuilder;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.linq4j.tree.ParameterExpression;
+import org.apache.calcite.linq4j.tree.Primitive;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Correlate;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.sql.SemiJoinType;
+import org.apache.calcite.util.BuiltInMethod;
+import org.apache.calcite.util.ImmutableBitSet;
+
+import com.google.common.collect.ImmutableList;
+
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Type;
+
+/** Implementation of {@link org.apache.calcite.rel.core.Correlate} in
+ * {@link org.apache.calcite.adapter.enumerable.EnumerableConvention enumerable calling convention}. */
+public class EnumerableCorrelate extends Correlate
+    implements EnumerableRel {
+
+  public EnumerableCorrelate(RelOptCluster cluster, RelTraitSet traits,
+      RelNode left, RelNode right,
+      CorrelationId correlationId,
+      ImmutableBitSet requiredColumns, SemiJoinType joinType) {
+    super(cluster, traits, left, right, correlationId, requiredColumns,
+        joinType);
+  }
+
+  @Override public EnumerableCorrelate copy(RelTraitSet traitSet,
+      RelNode left, RelNode right, CorrelationId correlationId,
+      ImmutableBitSet requiredColumns, SemiJoinType joinType) {
+    return new EnumerableCorrelate(getCluster(),
+        traitSet, left, right, correlationId, requiredColumns, joinType);
+  }
+
+  public Result implement(EnumerableRelImplementor implementor,
+      Prefer pref) {
+    final BlockBuilder builder = new BlockBuilder();
+    final Result leftResult =
+        implementor.visitChild(this, 0, (EnumerableRel) left, pref);
+    Expression leftExpression =
+        builder.append(
+            "left", leftResult.block);
+
+    final BlockBuilder corrBlock = new BlockBuilder();
+    Type corrVarType = leftResult.physType.getJavaRowType();
+    ParameterExpression corrRef; // correlate to be used in inner loop
+    ParameterExpression corrArg; // argument to correlate lambda (must be boxed)
+    if (!Primitive.is(corrVarType)) {
+      corrArg =
+          Expressions.parameter(Modifier.FINAL,
+              corrVarType, getCorrelVariable());
+      corrRef = corrArg;
+    } else {
+      corrArg =
+          Expressions.parameter(Modifier.FINAL,
+              Primitive.box(corrVarType), "$box" + getCorrelVariable());
+      corrRef = (ParameterExpression) corrBlock.append(getCorrelVariable(),
+          Expressions.unbox(corrArg));
+    }
+
+    implementor.registerCorrelVariable(getCorrelVariable(), corrRef,
+        corrBlock, leftResult.physType);
+
+    final Result rightResult =
+        implementor.visitChild(this, 1, (EnumerableRel) right, pref);
+
+    implementor.clearCorrelVariable(getCorrelVariable());
+
+    corrBlock.add(rightResult.block);
+
+    final PhysType physType =
+        PhysTypeImpl.of(
+            implementor.getTypeFactory(),
+            getRowType(),
+            pref.prefer(JavaRowFormat.CUSTOM));
+
+    Expression selector =
+        EnumUtils.joinSelector(
+            joinType.returnsJustFirstInput() ? joinType.toJoinType()
+                : JoinRelType.INNER, physType,
+            ImmutableList.of(leftResult.physType, rightResult.physType));
+
+    builder.append(Expressions.call(leftExpression,
+        BuiltInMethod.CORRELATE_JOIN.method,
+        Expressions.constant(joinType.toLinq4j()),
+        Expressions.lambda(corrBlock.toBlock(), corrArg),
+        selector
+    ));
+
+    return implementor.result(physType, builder.toBlock());
+  }
+}
+
+// End EnumerableCorrelate.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableCorrelateRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableCorrelateRule.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableCorrelateRule.java
new file mode 100644
index 0000000..eb08a27
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableCorrelateRule.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.adapter.enumerable;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
+
+/**
+ * Implementation of nested loops over enumerable inputs.
+ */
+public class EnumerableCorrelateRule extends ConverterRule {
+  EnumerableCorrelateRule() {
+    super(LogicalCorrelate.class, Convention.NONE,
+        EnumerableConvention.INSTANCE, "EnumerableCorrelateRule");
+  }
+
+  public RelNode convert(RelNode rel) {
+    final LogicalCorrelate c = (LogicalCorrelate) rel;
+    final RelTraitSet traitSet =
+        c.getTraitSet().replace(EnumerableConvention.INSTANCE);
+    return new EnumerableCorrelate(
+        rel.getCluster(),
+        traitSet,
+        convert(c.getLeft(), c.getLeft().getTraitSet()
+            .replace(EnumerableConvention.INSTANCE)),
+        convert(c.getRight(), c.getRight().getTraitSet()
+            .replace(EnumerableConvention.INSTANCE)),
+        c.getCorrelationId(),
+        c.getRequiredColumns(),
+        c.getJoinType());
+  }
+}
+
+// End EnumerableCorrelateRule.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableJoin.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableJoin.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableJoin.java
index 4508c5a..0498b6b 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableJoin.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableJoin.java
@@ -16,12 +16,9 @@
  */
 package org.apache.calcite.adapter.enumerable;
 
-import org.apache.calcite.linq4j.Ord;
-import org.apache.calcite.linq4j.function.Function2;
 import org.apache.calcite.linq4j.tree.BlockBuilder;
 import org.apache.calcite.linq4j.tree.Expression;
 import org.apache.calcite.linq4j.tree.Expressions;
-import org.apache.calcite.linq4j.tree.ParameterExpression;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
@@ -39,8 +36,6 @@ import org.apache.calcite.util.Util;
 
 import com.google.common.collect.ImmutableList;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Set;
 
 /** Implementation of {@link org.apache.calcite.rel.core.Join} in
@@ -167,7 +162,7 @@ public class EnumerableJoin extends EquiJoin implements EnumerableRel {
                     rightExpression,
                     leftResult.physType.generateAccessor(leftKeys),
                     rightResult.physType.generateAccessor(rightKeys),
-                    generateSelector(
+                    EnumUtils.joinSelector(joinType,
                         physType,
                         ImmutableList.of(
                             leftResult.physType, rightResult.physType)))
@@ -180,42 +175,6 @@ public class EnumerableJoin extends EquiJoin implements EnumerableRel {
                         joinType.generatesNullsOnRight())))).toBlock());
   }
 
-  Expression generateSelector(PhysType physType,
-      List<PhysType> inputPhysTypes) {
-    // A parameter for each input.
-    final List<ParameterExpression> parameters =
-        new ArrayList<ParameterExpression>();
-
-    // Generate all fields.
-    final List<Expression> expressions =
-        new ArrayList<Expression>();
-    for (Ord<PhysType> ord : Ord.zip(inputPhysTypes)) {
-      final PhysType inputPhysType =
-          ord.e.makeNullable(joinType.generatesNullsOn(ord.i));
-      final ParameterExpression parameter =
-          Expressions.parameter(inputPhysType.getJavaRowType(),
-              EnumUtils.LEFT_RIGHT[ord.i]);
-      parameters.add(parameter);
-      final int fieldCount = inputPhysType.getRowType().getFieldCount();
-      for (int i = 0; i < fieldCount; i++) {
-        Expression expression =
-            inputPhysType.fieldReference(parameter, i,
-                physType.getJavaFieldType(i));
-        if (joinType.generatesNullsOn(ord.i)) {
-          expression =
-              Expressions.condition(
-                  Expressions.equal(parameter, Expressions.constant(null)),
-                  Expressions.constant(null),
-                  expression);
-        }
-        expressions.add(expression);
-      }
-    }
-    return Expressions.lambda(
-        Function2.class,
-        physType.record(expressions),
-        parameters);
-  }
 }
 
 // End EnumerableJoin.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRelImplementor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRelImplementor.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRelImplementor.java
index 86637dc..436eb4e 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRelImplementor.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRelImplementor.java
@@ -20,6 +20,7 @@ import org.apache.calcite.DataContext;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.calcite.linq4j.Enumerable;
 import org.apache.calcite.linq4j.Queryable;
+import org.apache.calcite.linq4j.function.Function1;
 import org.apache.calcite.linq4j.tree.BlockBuilder;
 import org.apache.calcite.linq4j.tree.BlockStatement;
 import org.apache.calcite.linq4j.tree.Blocks;
@@ -44,6 +45,7 @@ import org.apache.calcite.util.BuiltInMethod;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
 
 import java.io.Serializable;
 import java.lang.reflect.Modifier;
@@ -60,6 +62,16 @@ import java.util.Map;
  */
 public class EnumerableRelImplementor extends JavaRelImplementor {
   public final Map<String, Object> map;
+  private final Map<String, RexToLixTranslator.InputGetter> corrVars =
+      Maps.newHashMap();
+
+  protected final Function1<String, RexToLixTranslator.InputGetter>
+  allCorrelateVariables =
+    new Function1<String, RexToLixTranslator.InputGetter>() {
+      public RexToLixTranslator.InputGetter apply(String name) {
+        return getCorrelVariableGetter(name);
+      }
+    };
 
   public EnumerableRelImplementor(RexBuilder rexBuilder,
       Map<String, Object> internalParameters) {
@@ -75,7 +87,6 @@ public class EnumerableRelImplementor extends JavaRelImplementor {
     if (parent != null) {
       assert child == parent.getInputs().get(ordinal);
     }
-    createFrame(parent, ordinal, child);
     return child.implement(this, prefer);
   }
 
@@ -385,6 +396,30 @@ public class EnumerableRelImplementor extends JavaRelImplementor {
     return Expressions.convert_(e, clazz);
   }
 
+  public void registerCorrelVariable(final String name,
+      final ParameterExpression pe,
+      final BlockBuilder corrBlock, final PhysType physType) {
+    corrVars.put(name, new RexToLixTranslator.InputGetter() {
+      public Expression field(BlockBuilder list, int index, Type storageType) {
+        Expression fieldReference =
+            physType.fieldReference(pe, index, storageType);
+        return corrBlock.append(name + "_" + index, fieldReference);
+      }
+    });
+  }
+
+  public void clearCorrelVariable(String name) {
+    assert corrVars.containsKey(name) : "Correlation variable " + name
+        + " should be defined";
+    corrVars.remove(name);
+  }
+
+  public RexToLixTranslator.InputGetter getCorrelVariableGetter(String name) {
+    assert corrVars.containsKey(name) : "Correlation variable " + name
+        + " should be defined";
+    return corrVars.get(name);
+  }
+
   public EnumerableRel.Result result(PhysType physType, BlockStatement block) {
     return new EnumerableRel.Result(
         block, physType, ((PhysTypeImpl) physType).format);

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRules.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRules.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRules.java
index 62ac2eb..700e7e2 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRules.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRules.java
@@ -36,6 +36,9 @@ public class EnumerableRules {
   public static final RelOptRule ENUMERABLE_SEMI_JOIN_RULE =
       new EnumerableSemiJoinRule();
 
+  public static final RelOptRule ENUMERABLE_CORRELATE_RULE =
+      new EnumerableCorrelateRule();
+
   private EnumerableRules() {
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/adapter/enumerable/JavaRelImplementor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/JavaRelImplementor.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/JavaRelImplementor.java
index b96efb8..cd0c3f5 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/JavaRelImplementor.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/JavaRelImplementor.java
@@ -19,20 +19,28 @@ package org.apache.calcite.adapter.enumerable;
 import org.apache.calcite.DataContext;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.linq4j.tree.ParameterExpression;
-import org.apache.calcite.rel.RelImplementorImpl;
+import org.apache.calcite.plan.RelImplementor;
 import org.apache.calcite.rex.RexBuilder;
 
 /**
- * Abstract base class for implementations of {@link RelImplementorImpl}
+ * Abstract base class for implementations of {@link RelImplementor}
  * that generate java code.
  */
-public abstract class JavaRelImplementor extends RelImplementorImpl {
+public abstract class JavaRelImplementor implements RelImplementor {
+  private final RexBuilder rexBuilder;
+
   public JavaRelImplementor(RexBuilder rexBuilder) {
-    super(rexBuilder);
+    this.rexBuilder = rexBuilder;
+    assert rexBuilder.getTypeFactory() instanceof JavaTypeFactory
+        : "Type factory of rexBuilder should be a JavaTypeFactory";
+  }
+
+  public RexBuilder getRexBuilder() {
+    return rexBuilder;
   }
 
-  @Override public JavaTypeFactory getTypeFactory() {
-    return (JavaTypeFactory) super.getTypeFactory();
+  public JavaTypeFactory getTypeFactory() {
+    return (JavaTypeFactory) rexBuilder.getTypeFactory();
   }
 
   /** Returns the expression with which to access the

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/adapter/enumerable/RexToLixTranslator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexToLixTranslator.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexToLixTranslator.java
index 0288e9d..5b140e5 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexToLixTranslator.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexToLixTranslator.java
@@ -20,6 +20,7 @@ import org.apache.calcite.DataContext;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.avatica.util.ByteString;
 import org.apache.calcite.avatica.util.DateTimeUtils;
+import org.apache.calcite.linq4j.function.Function1;
 import org.apache.calcite.linq4j.tree.BlockBuilder;
 import org.apache.calcite.linq4j.tree.ConstantExpression;
 import org.apache.calcite.linq4j.tree.Expression;
@@ -32,7 +33,9 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactoryImpl;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexCorrelVariable;
 import org.apache.calcite.rex.RexDynamicParam;
+import org.apache.calcite.rex.RexFieldAccess;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexLocalRef;
@@ -87,6 +90,7 @@ public class RexToLixTranslator {
   private final BlockBuilder list;
   private final Map<? extends RexNode, Boolean> exprNullableMap;
   private final RexToLixTranslator parent;
+  private final Function1<String, InputGetter> correlates;
 
   private static Method findMethod(
       Class<?> clazz, String name, Class... parameterTypes) {
@@ -127,6 +131,19 @@ public class RexToLixTranslator {
       Map<? extends RexNode, Boolean> exprNullableMap,
       RexBuilder builder,
       RexToLixTranslator parent) {
+    this(program, typeFactory, inputGetter, list, exprNullableMap, builder,
+        parent, null);
+  }
+
+  private RexToLixTranslator(
+      RexProgram program,
+      JavaTypeFactory typeFactory,
+      InputGetter inputGetter,
+      BlockBuilder list,
+      Map<? extends RexNode, Boolean> exprNullableMap,
+      RexBuilder builder,
+      RexToLixTranslator parent,
+      Function1<String, InputGetter> correlates) {
     this.program = program;
     this.typeFactory = typeFactory;
     this.inputGetter = inputGetter;
@@ -134,6 +151,7 @@ public class RexToLixTranslator {
     this.exprNullableMap = exprNullableMap;
     this.builder = builder;
     this.parent = parent;
+    this.correlates = correlates;
   }
 
   /**
@@ -145,6 +163,8 @@ public class RexToLixTranslator {
    * @param list List of statements, populated with declarations
    * @param outputPhysType Output type, or null
    * @param inputGetter Generates expressions for inputs
+   * @param correlates Provider of references to the values of correlated
+   *                   variables
    * @return Sequence of expressions, optional condition
    */
   public static List<Expression> translateProjects(
@@ -152,7 +172,8 @@ public class RexToLixTranslator {
       JavaTypeFactory typeFactory,
       BlockBuilder list,
       PhysType outputPhysType,
-      InputGetter inputGetter) {
+      InputGetter inputGetter,
+      Function1<String, InputGetter> correlates) {
     List<Type> storageTypes = null;
     if (outputPhysType != null) {
       final RelDataType rowType = outputPhysType.getRowType();
@@ -162,6 +183,7 @@ public class RexToLixTranslator {
       }
     }
     return new RexToLixTranslator(program, typeFactory, inputGetter, list)
+        .setCorrelates(correlates)
         .translateList(program.getProjectList(), storageTypes);
   }
 
@@ -426,6 +448,26 @@ public class RexToLixTranslator {
           nullAs);
     case DYNAMIC_PARAM:
       return translateParameter((RexDynamicParam) expr, nullAs, storageType);
+    case CORREL_VARIABLE:
+      throw new RuntimeException("Cannot translate " + expr + ". Correlated"
+          + " variables should always be referenced by field access");
+    case FIELD_ACCESS:
+      RexFieldAccess fieldAccess = (RexFieldAccess) expr;
+      RexNode target = deref(fieldAccess.getReferenceExpr());
+      // only $cor.field access is supported
+      if (!(target instanceof RexCorrelVariable)) {
+        throw new RuntimeException(
+            "cannot translate expression " + expr);
+      }
+      if (correlates == null) {
+        throw new RuntimeException("Cannot translate " + expr + " since "
+            + "correlate variables resolver is not defined");
+      }
+      InputGetter getter =
+          correlates.apply(((RexCorrelVariable) target).getName());
+      Expression res =
+          getter.field(list, fieldAccess.getField().getIndex(), storageType);
+      return res;
     default:
       if (expr instanceof RexCall) {
         return translateCall((RexCall) expr, nullAs);
@@ -630,12 +672,14 @@ public class RexToLixTranslator {
       RexProgram program,
       JavaTypeFactory typeFactory,
       BlockBuilder list,
-      InputGetter inputGetter) {
+      InputGetter inputGetter,
+      Function1<String, InputGetter> correlates) {
     if (program.getCondition() == null) {
       return RexImpTable.TRUE_EXPR;
     }
-    final RexToLixTranslator translator =
+    RexToLixTranslator translator =
         new RexToLixTranslator(program, typeFactory, inputGetter, list);
+    translator = translator.setCorrelates(correlates);
     return translator.translate(
         program.getCondition(),
         RexImpTable.NullAs.FALSE);
@@ -898,7 +942,8 @@ public class RexToLixTranslator {
       return this;
     }
     return new RexToLixTranslator(
-        program, typeFactory, inputGetter, list, nullable, builder, this);
+        program, typeFactory, inputGetter, list, nullable, builder, this,
+        correlates);
   }
 
   public RexToLixTranslator setBlock(BlockBuilder block) {
@@ -907,7 +952,17 @@ public class RexToLixTranslator {
     }
     return new RexToLixTranslator(
         program, typeFactory, inputGetter, block,
-        Collections.<RexNode, Boolean>emptyMap(), builder, this);
+        Collections.<RexNode, Boolean>emptyMap(), builder, this, correlates);
+  }
+
+  public RexToLixTranslator setCorrelates(
+      Function1<String, InputGetter> correlates) {
+    if (this.correlates == correlates) {
+      return this;
+    }
+    return new RexToLixTranslator(
+        program, typeFactory, inputGetter, list,
+        Collections.<RexNode, Boolean>emptyMap(), builder, this, correlates);
   }
 
   public RelDataType nullifyType(RelDataType type, boolean nullable) {

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/config/CalciteConnectionConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/config/CalciteConnectionConfig.java b/core/src/main/java/org/apache/calcite/config/CalciteConnectionConfig.java
index d7440e8..b41f478 100644
--- a/core/src/main/java/org/apache/calcite/config/CalciteConnectionConfig.java
+++ b/core/src/main/java/org/apache/calcite/config/CalciteConnectionConfig.java
@@ -44,6 +44,8 @@ public interface CalciteConnectionConfig extends ConnectionConfig {
   boolean caseSensitive();
   /** @see CalciteConnectionProperty#SPARK */
   boolean spark();
+  /** @see CalciteConnectionProperty#FORCE_DECORRELATE */
+  boolean forceDecorrelate();
   /** @see CalciteConnectionProperty#TYPE_SYSTEM */
   <T> T typeSystem(Class<T> typeSystemClass, T defaultTypeSystem);
 }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/config/CalciteConnectionConfigImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/config/CalciteConnectionConfigImpl.java b/core/src/main/java/org/apache/calcite/config/CalciteConnectionConfigImpl.java
index 43d7946..99876b3 100644
--- a/core/src/main/java/org/apache/calcite/config/CalciteConnectionConfigImpl.java
+++ b/core/src/main/java/org/apache/calcite/config/CalciteConnectionConfigImpl.java
@@ -83,6 +83,11 @@ public class CalciteConnectionConfigImpl extends ConnectionConfigImpl
     return CalciteConnectionProperty.SPARK.wrap(properties).getBoolean();
   }
 
+  public boolean forceDecorrelate() {
+    return CalciteConnectionProperty.FORCE_DECORRELATE.wrap(properties)
+        .getBoolean();
+  }
+
   public <T> T typeSystem(Class<T> typeSystemClass, T defaultTypeSystem) {
     return CalciteConnectionProperty.TYPE_SYSTEM.wrap(properties)
         .getPlugin(typeSystemClass, defaultTypeSystem);

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/config/CalciteConnectionProperty.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/config/CalciteConnectionProperty.java b/core/src/main/java/org/apache/calcite/config/CalciteConnectionProperty.java
index c42e2d7..678a49c 100644
--- a/core/src/main/java/org/apache/calcite/config/CalciteConnectionProperty.java
+++ b/core/src/main/java/org/apache/calcite/config/CalciteConnectionProperty.java
@@ -72,6 +72,10 @@ public enum CalciteConnectionProperty implements ConnectionProperty {
   /** Timezone, for example 'gmt-3'. Default is the JVM's time zone. */
   TIMEZONE("timezone", Type.STRING, null, false),
 
+  /** If the planner should try de-correlating as much as it is possible.
+   * If true (the default), Calcite de-correlates the plan. */
+  FORCE_DECORRELATE("forceDecorrelate", Type.BOOLEAN, true, false),
+
   /** Type system. The name of a class that implements
    * {@link org.apache.calcite.rel.type.RelDataTypeSystem} and has a public
    * default constructor or an {@code INSTANCE} constant. */

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/plan/RelImplementor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/plan/RelImplementor.java b/core/src/main/java/org/apache/calcite/plan/RelImplementor.java
index 6ae9d6c..50b1cef 100644
--- a/core/src/main/java/org/apache/calcite/plan/RelImplementor.java
+++ b/core/src/main/java/org/apache/calcite/plan/RelImplementor.java
@@ -19,38 +19,12 @@ package org.apache.calcite.plan;
 import org.apache.calcite.rel.RelNode;
 
 /**
- * Callback used to hold state while converting a tree of
+ * This is a marker interface for a callback used to convert a tree of
  * {@link RelNode relational expressions} into a plan. Calling
  * conventions typically have their own protocol for walking over a
- * tree, and correspondingly have their own implementors, which are
- * subclasses of <code>RelImplementor</code>.
+ * tree, and correspondingly have their own implementors
  */
 public interface RelImplementor {
-  //~ Methods ----------------------------------------------------------------
-
-  /**
-   * Implements a relational expression according to a calling convention.
-   *
-   * @param parent  Parent relational expression
-   * @param ordinal Ordinal of child within its parent
-   * @param child   Child relational expression
-   * @return Interpretation of the return value is left to the implementor
-   */
-  Object visitChild(
-      RelNode parent,
-      int ordinal,
-      RelNode child);
-
-  /**
-   * Called from {@link #visitChild} after the frame has been set up. Specific
-   * implementors should override this method.
-   *
-   * @param child   Child relational expression
-   * @param ordinal Ordinal of child within its parent
-   * @param arg     Additional parameter; type depends on implementor
-   * @return Interpretation of the return value is left to the implementor
-   */
-  Object visitChildInternal(RelNode child, int ordinal, Object arg);
 }
 
 // End RelImplementor.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/plan/volcano/RelSet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/plan/volcano/RelSet.java b/core/src/main/java/org/apache/calcite/plan/volcano/RelSet.java
index d389fce..1af4534 100644
--- a/core/src/main/java/org/apache/calcite/plan/volcano/RelSet.java
+++ b/core/src/main/java/org/apache/calcite/plan/volcano/RelSet.java
@@ -222,12 +222,6 @@ class RelSet {
     if (this.rel == null) {
       this.rel = rel;
     } else {
-      assert rel.getCorrelVariable() == null;
-      String correl = this.rel.getCorrelVariable();
-      if (correl != null) {
-        rel.setCorrelVariable(correl);
-      }
-
       // Row types must be the same, except for field names.
       RelOptUtil.verifyTypeEquivalence(
           this.rel,

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/prepare/CalciteMaterializer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/prepare/CalciteMaterializer.java b/core/src/main/java/org/apache/calcite/prepare/CalciteMaterializer.java
index 118214e..47b753b 100644
--- a/core/src/main/java/org/apache/calcite/prepare/CalciteMaterializer.java
+++ b/core/src/main/java/org/apache/calcite/prepare/CalciteMaterializer.java
@@ -26,11 +26,11 @@ import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelShuttle;
-import org.apache.calcite.rel.core.Correlator;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.core.TableFunctionScan;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalIntersect;
 import org.apache.calcite.rel.logical.LogicalJoin;
@@ -159,8 +159,8 @@ class CalciteMaterializer extends CalcitePrepareImpl.CalcitePreparingStmt {
     public RelNode visit(LogicalJoin join) {
       return join;
     }
-    public RelNode visit(Correlator correlator) {
-      return correlator;
+    public RelNode visit(LogicalCorrelate correlate) {
+      return correlate;
     }
     public RelNode visit(LogicalUnion union) {
       return union;

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java b/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java
index 6964984..1eafb3a 100644
--- a/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java
+++ b/core/src/main/java/org/apache/calcite/prepare/CalcitePrepareImpl.java
@@ -168,6 +168,7 @@ public class CalcitePrepareImpl implements CalcitePrepare {
       ImmutableList.of(
           EnumerableRules.ENUMERABLE_JOIN_RULE,
           EnumerableRules.ENUMERABLE_SEMI_JOIN_RULE,
+          EnumerableRules.ENUMERABLE_CORRELATE_RULE,
           EnumerableRules.ENUMERABLE_PROJECT_RULE,
           EnumerableRules.ENUMERABLE_FILTER_RULE,
           EnumerableRules.ENUMERABLE_AGGREGATE_RULE,

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/prepare/Prepare.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/prepare/Prepare.java b/core/src/main/java/org/apache/calcite/prepare/Prepare.java
index 5f33120..ccbf175 100644
--- a/core/src/main/java/org/apache/calcite/prepare/Prepare.java
+++ b/core/src/main/java/org/apache/calcite/prepare/Prepare.java
@@ -257,8 +257,10 @@ public abstract class Prepare {
     // storage.
     rootRel = flattenTypes(rootRel, true);
 
-    // Subquery decorrelation.
-    rootRel = decorrelate(sqlToRelConverter, sqlQuery, rootRel);
+    if (this.context.config().forceDecorrelate()) {
+      // Subquery decorrelation.
+      rootRel = decorrelate(sqlToRelConverter, sqlQuery, rootRel);
+    }
 
     // Trim unused fields.
     rootRel = trimUnusedFields(rootRel);

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/rel/AbstractRelNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/AbstractRelNode.java b/core/src/main/java/org/apache/calcite/rel/AbstractRelNode.java
index 1bd85cb..d220dc4 100644
--- a/core/src/main/java/org/apache/calcite/rel/AbstractRelNode.java
+++ b/core/src/main/java/org/apache/calcite/rel/AbstractRelNode.java
@@ -93,12 +93,6 @@ public abstract class AbstractRelNode implements RelNode {
   protected int id;
 
   /**
-   * The variable by which to refer to rows from this relational expression,
-   * as correlating expressions; null if this expression is not correlated on.
-   */
-  private String correlVariable;
-
-  /**
    * The RelTraitSet that describes the traits of this RelNode.
    */
   protected RelTraitSet traitSet;
@@ -159,12 +153,8 @@ public abstract class AbstractRelNode implements RelNode {
     return traitSet;
   }
 
-  public void setCorrelVariable(String correlVariable) {
-    this.correlVariable = correlVariable;
-  }
-
   public String getCorrelVariable() {
-    return correlVariable;
+    return null;
   }
 
   public boolean isDistinct() {
@@ -184,14 +174,6 @@ public abstract class AbstractRelNode implements RelNode {
     return inputs.get(i);
   }
 
-  public String getOrCreateCorrelVariable() {
-    if (correlVariable == null) {
-      correlVariable = getQuery().createCorrel();
-      getQuery().mapCorrel(correlVariable, this);
-    }
-    return correlVariable;
-  }
-
   public final RelOptQuery getQuery() {
     return getCluster().getQuery();
   }
@@ -256,9 +238,6 @@ public abstract class AbstractRelNode implements RelNode {
   }
 
   public void collectVariablesSet(Set<String> variableSet) {
-    if (correlVariable != null) {
-      variableSet.add(correlVariable);
-    }
   }
 
   public void childrenAccept(RelVisitor visitor) {
@@ -348,12 +327,6 @@ public abstract class AbstractRelNode implements RelNode {
     return this.digest;
   }
 
-  public void registerCorrelVariable(String correlVariable) {
-    assert this.correlVariable == null;
-    this.correlVariable = correlVariable;
-    getQuery().mapCorrel(correlVariable, this);
-  }
-
   public void replaceInput(
       int ordinalInParent,
       RelNode p) {

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/rel/BiRel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/BiRel.java b/core/src/main/java/org/apache/calcite/rel/BiRel.java
new file mode 100644
index 0000000..494d473
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/BiRel.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.runtime.FlatLists;
+import org.apache.calcite.util.Util;
+
+import java.util.List;
+
+/**
+ * Abstract base class for relational expressions with a two inputs.
+ *
+ * <p>It is not required that two-input relational expressions use this
+ * class as a base class. However, default implementations of methods make life
+ * easier.
+ */
+public abstract class BiRel extends AbstractRelNode {
+  protected RelNode left;
+  protected RelNode right;
+
+  public BiRel(
+      RelOptCluster cluster, RelTraitSet traitSet, RelNode left,
+      RelNode right) {
+    super(cluster, traitSet);
+    this.left = left;
+    this.right = right;
+  }
+
+  public void childrenAccept(RelVisitor visitor) {
+    visitor.visit(left, 0, this);
+    visitor.visit(right, 1, this);
+  }
+
+  public List<RelNode> getInputs() {
+    return FlatLists.of(left, right);
+  }
+
+  public RelNode getLeft() {
+    return left;
+  }
+
+  public RelNode getRight() {
+    return right;
+  }
+
+  public void replaceInput(
+      int ordinalInParent,
+      RelNode p) {
+    switch (ordinalInParent) {
+    case 0:
+      this.left = p;
+      break;
+    case 1:
+      this.right = p;
+      break;
+    default:
+      throw Util.newInternal();
+    }
+    recomputeDigest();
+  }
+
+  @Override
+  public RelWriter explainTerms(RelWriter pw) {
+    return super.explainTerms(pw)
+        .input("left", left)
+        .input("right", right);
+  }
+}
+
+// End BiRel.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/rel/RelImplementorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/RelImplementorImpl.java b/core/src/main/java/org/apache/calcite/rel/RelImplementorImpl.java
deleted file mode 100644
index 8e42bed..0000000
--- a/core/src/main/java/org/apache/calcite/rel/RelImplementorImpl.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.rel;
-
-import org.apache.calcite.plan.RelImplementor;
-import org.apache.calcite.rel.core.Join;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.util.trace.CalciteTrace;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Logger;
-
-/**
- * Implementation of {@link RelImplementor}.
- */
-public class RelImplementorImpl implements RelImplementor {
-  protected static final Logger LOGGER =
-      CalciteTrace.getRelImplementorTracer();
-
-  /**
-   * Maps a {@link String} to the {@link RelImplementorImpl.Frame} whose
-   * {@link Frame#rel}.correlVariable == correlName.
-   */
-  protected final Map<String, Frame> mapCorrel2Frame =
-      new HashMap<String, Frame>();
-
-  /**
-   * Maps a {@link RelNode} to the unique frame whose
-   * {@link RelImplementorImpl.Frame#rel} is
-   * that relational expression.
-   */
-  protected final Map<RelNode, Frame> mapRel2Frame =
-      new HashMap<RelNode, Frame>();
-
-  protected final RexBuilder rexBuilder;
-
-  public RelImplementorImpl(RexBuilder rexBuilder) {
-    this.rexBuilder = rexBuilder;
-  }
-
-  public RexBuilder getRexBuilder() {
-    return rexBuilder;
-  }
-
-  public RelDataTypeFactory getTypeFactory() {
-    return rexBuilder.getTypeFactory();
-  }
-
-  public final Object visitChild(
-      RelNode parent,
-      int ordinal,
-      RelNode child) {
-    if (parent != null) {
-      assert child == parent.getInputs().get(ordinal);
-    }
-    createFrame(parent, ordinal, child);
-    return visitChildInternal(child, ordinal, null);
-  }
-
-  protected void createFrame(RelNode parent, int ordinal, RelNode child) {
-    Frame frame = new Frame(child, parent, ordinal);
-    mapRel2Frame.put(child, frame);
-    String correl = child.getCorrelVariable();
-    if (correl != null) {
-      // Record that this frame is responsible for setting this
-      // variable. But if another frame is already doing the job --
-      // this frame's parent, which belongs to the same set -- don't
-      // override it.
-      if (mapCorrel2Frame.get(correl) == null) {
-        mapCorrel2Frame.put(correl, frame);
-      }
-    }
-  }
-
-  public Object visitChildInternal(RelNode child, int ordinal, Object arg) {
-    throw new UnsupportedOperationException();
-  }
-
-  protected RelNode findInputRel(
-      RelNode rel, int offset) {
-    return findInputRel(
-        rel,
-        offset,
-        new int[]{0});
-  }
-
-  private RelNode findInputRel(
-      RelNode rel,
-      int offset,
-      int[] offsets) {
-    if (rel instanceof Join) {
-      // no variable here -- go deeper
-      List<RelNode> inputs = rel.getInputs();
-      for (RelNode input : inputs) {
-        RelNode result = findInputRel(input, offset, offsets);
-        if (result != null) {
-          return result;
-        }
-      }
-    } else if (offset == offsets[0]) {
-      return rel;
-    } else {
-      offsets[0]++;
-    }
-    return null; // not found
-  }
-
-  /**
-   * Returns a list of the relational expressions which are ancestors of the
-   * current one.
-   */
-  public List<RelNode> getAncestorRels(RelNode rel) {
-    final List<RelNode> ancestorList = new ArrayList<RelNode>();
-    Frame frame = mapRel2Frame.get(rel);
-    assert frame != null : "rel must be on the current implementation stack";
-    while (true) {
-      ancestorList.add(frame.rel);
-      final RelNode parentRel = frame.parent;
-      if (parentRel == null) {
-        break;
-      }
-      frame = mapRel2Frame.get(parentRel);
-      assert frame != null : "ancestor rel must have frame";
-    }
-    return ancestorList;
-  }
-
-  /** Information about a call from a parent relational expression
-   * to implement one of its input relational expressions. */
-  protected static class Frame {
-    /** Parent relational expression. */
-    public final RelNode parent;
-
-    /** Relational expression that is being implemented in this frame. */
-    public final RelNode rel;
-
-    /** Ordinal of {@code rel} within {@code parent}. */
-    public final int ordinal;
-
-    Frame(RelNode child, RelNode parent, int ordinal) {
-      this.rel = child;
-      this.parent = parent;
-      this.ordinal = ordinal;
-    }
-  }
-}
-
-// End RelImplementorImpl.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/rel/RelNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/RelNode.java b/core/src/main/java/org/apache/calcite/rel/RelNode.java
index fa28861..7f993f8 100644
--- a/core/src/main/java/org/apache/calcite/rel/RelNode.java
+++ b/core/src/main/java/org/apache/calcite/rel/RelNode.java
@@ -96,17 +96,9 @@ public interface RelNode extends RelOptNode, Cloneable {
   Convention getConvention();
 
   /**
-   * Sets the name of the variable which is to be implicitly set at runtime
-   * each time a row is returned from this relational expression
-   *
-   * @param correlVariable Name of correlating variable
-   */
-  void setCorrelVariable(String correlVariable);
-
-  /**
    * Returns the name of the variable which is to be implicitly set at runtime
-   * each time a row is returned from this relational expression; or null if
-   * there is no variable.
+   * each time a row is returned from the first input of this relational
+   * expression; or null if there is no variable.
    *
    * @return Name of correlating variable, or null
    */
@@ -127,13 +119,6 @@ public interface RelNode extends RelOptNode, Cloneable {
   RelNode getInput(int i);
 
   /**
-   * Returns a variable with which to reference the current row of this
-   * relational expression as a correlating variable. Creates a variable if
-   * none exists.
-   */
-  String getOrCreateCorrelVariable();
-
-  /**
    * Returns the sub-query this relational expression belongs to. A sub-query
    * determines the scope for correlating variables (see
    * {@link #setCorrelVariable(String)}).
@@ -177,9 +162,6 @@ public interface RelNode extends RelOptNode, Cloneable {
    * Returns the names of variables which are set in this relational
    * expression but also used and therefore not available to parents of this
    * relational expression.
-   *
-   * <p>By default, returns the empty set. Derived classes may override this
-   * method.</p>
    */
   Set<String> getVariablesStopped();
 
@@ -195,6 +177,7 @@ public interface RelNode extends RelOptNode, Cloneable {
 
   /**
    * Collects variables set by this expression.
+   * TODO: is this required?
    *
    * @param variableSet receives variables known to be set by
    */
@@ -253,13 +236,6 @@ public interface RelNode extends RelOptNode, Cloneable {
   String recomputeDigest();
 
   /**
-   * Registers a correlation variable.
-   *
-   * @see #getVariablesStopped
-   */
-  void registerCorrelVariable(String correlVariable);
-
-  /**
    * Replaces the <code>ordinalInParent</code><sup>th</sup> input. You must
    * override this method if you override {@link #getInputs}.
    */

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/rel/RelShuttle.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/RelShuttle.java b/core/src/main/java/org/apache/calcite/rel/RelShuttle.java
index c1e1915..e09eb7a 100644
--- a/core/src/main/java/org/apache/calcite/rel/RelShuttle.java
+++ b/core/src/main/java/org/apache/calcite/rel/RelShuttle.java
@@ -16,11 +16,11 @@
  */
 package org.apache.calcite.rel;
 
-import org.apache.calcite.rel.core.Correlator;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.core.TableFunctionScan;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalIntersect;
 import org.apache.calcite.rel.logical.LogicalJoin;
@@ -45,7 +45,7 @@ public interface RelShuttle {
 
   RelNode visit(LogicalJoin join);
 
-  RelNode visit(Correlator correlator);
+  RelNode visit(LogicalCorrelate correlate);
 
   RelNode visit(LogicalUnion union);
 

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/rel/RelShuttleImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/RelShuttleImpl.java b/core/src/main/java/org/apache/calcite/rel/RelShuttleImpl.java
index 6b7cb96..8eec43c 100644
--- a/core/src/main/java/org/apache/calcite/rel/RelShuttleImpl.java
+++ b/core/src/main/java/org/apache/calcite/rel/RelShuttleImpl.java
@@ -17,11 +17,11 @@
 package org.apache.calcite.rel;
 
 import org.apache.calcite.linq4j.Ord;
-import org.apache.calcite.rel.core.Correlator;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.core.TableFunctionScan;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalIntersect;
 import org.apache.calcite.rel.logical.LogicalJoin;
@@ -97,8 +97,8 @@ public class RelShuttleImpl implements RelShuttle {
     return visitChildren(join);
   }
 
-  public RelNode visit(Correlator correlator) {
-    return visitChildren(correlator);
+  public RelNode visit(LogicalCorrelate correlate) {
+    return visitChildren(correlate);
   }
 
   public RelNode visit(LogicalUnion union) {

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/rel/core/Correlate.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/core/Correlate.java b/core/src/main/java/org/apache/calcite/rel/core/Correlate.java
new file mode 100644
index 0000000..37edda9
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/core/Correlate.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.core;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptCost;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.BiRel;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SemiJoinType;
+import org.apache.calcite.util.ImmutableBitSet;
+
+import com.google.common.collect.ImmutableSet;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * A relational operator that performs nested-loop joins.
+ * <p/>
+ * <p>It behaves like a kind of {@link org.apache.calcite.rel.core.Join},
+ * but works by setting variables in its environment and restarting its
+ * right-hand input.
+ * <p/>
+ * <p>Correlate is not a join since: typical rules should not match Correlate.
+ * <p/>
+ * <p>A Correlate is used to represent a correlated query. One
+ * implementation strategy is to de-correlate the expression.
+ *
+ * NestedLoops     -> Correlate(A, B, regular)
+ * NestedLoopsOuter-> Correlate(A, B, outer)
+ * NestedLoopsSemi -> Correlate(A, B, semi)
+ * NestedLoopsAnti -> Correlate(A, B, anti)
+ * HashJoin        -> EquiJoin(A, B)
+ * HashJoinOuter   -> EquiJoin(A, B)
+ * HashJoinSemi    -> SemiJoin(A, B, semi)
+ * HashJoinAnti    -> SemiJoin(A, B, anti)
+ *
+ * @see CorrelationId
+ */
+public abstract class Correlate extends BiRel {
+  //~ Instance fields --------------------------------------------------------
+
+  protected final CorrelationId correlationId;
+  protected final ImmutableBitSet requiredColumns;
+  protected final SemiJoinType joinType;
+
+  //~ Constructors -----------------------------------------------------------
+
+  /**
+   * Creates a Correlate.
+   * @param cluster      cluster this relational expression belongs to
+   * @param left         left input relational expression
+   * @param right        right input relational expression
+   * @param correlationId variable name for the row of left input
+   * @param requiredColumns
+   * @param joinType join type
+   */
+  protected Correlate(
+      RelOptCluster cluster,
+      RelTraitSet traits,
+      RelNode left,
+      RelNode right,
+      CorrelationId correlationId,
+      ImmutableBitSet requiredColumns, SemiJoinType joinType) {
+    super(cluster, traits, left, right);
+    this.joinType = joinType;
+    this.correlationId = correlationId;
+    this.requiredColumns = requiredColumns;
+  }
+
+  /**
+   * Creates a Correlate by parsing serialized output.
+   */
+  public Correlate(RelInput input) {
+    this(
+        input.getCluster(), input.getTraitSet(), input.getInputs().get(0),
+        input.getInputs().get(1),
+        new CorrelationId((Integer) input.get("correlationId")),
+        input.getBitSet("requiredColumns"),
+        input.getEnum("joinType", SemiJoinType.class));
+  }
+
+  //~ Methods ----------------------------------------------------------------
+
+  @Override public Correlate copy(RelTraitSet traitSet, List<RelNode> inputs) {
+    assert inputs.size() == 2;
+    return copy(traitSet,
+        inputs.get(0),
+        inputs.get(1),
+        correlationId,
+        requiredColumns,
+        joinType);
+  }
+
+  public abstract Correlate copy(RelTraitSet traitSet,
+      RelNode left, RelNode right, CorrelationId correlationId,
+      ImmutableBitSet requiredColumns, SemiJoinType joinType);
+
+  public SemiJoinType getJoinType() {
+    return joinType;
+  }
+
+  @Override protected RelDataType deriveRowType() {
+    switch (joinType) {
+    case LEFT:
+    case INNER:
+      // LogicalJoin is used to share the code of column names deduplication
+      return new LogicalJoin(getCluster(), left, right,
+          getCluster().getRexBuilder().makeLiteral(true),
+          joinType.toJoinType(), ImmutableSet.<String>of())
+          .deriveRowType();
+    case ANTI:
+    case SEMI:
+      return left.getRowType();
+    default:
+      throw new IllegalStateException("Unknown join type " + joinType);
+    }
+  }
+
+  @Override public RelWriter explainTerms(RelWriter pw) {
+    return super.explainTerms(pw)
+        .item("correlation", correlationId)
+        .item("joinType", joinType)
+        .item("requiredColumns", requiredColumns.toString());
+  }
+
+  /**
+   * Returns the correlating expressions.
+   *
+   * @return correlating expressions
+   */
+  public CorrelationId getCorrelationId() {
+    return correlationId;
+  }
+
+  @Override public String getCorrelVariable() {
+    return correlationId.getName();
+  }
+
+  /**
+   * Returns the required columns in left relation required for the correlation
+   * in the right.
+   *
+   * @return columns in left relation required for the correlation in the right
+   */
+  public ImmutableBitSet getRequiredColumns() {
+    return requiredColumns;
+  }
+
+  @Override public Set<String> getVariablesStopped() {
+    return ImmutableSet.of(correlationId.getName());
+  }
+
+  @Override public RelOptCost computeSelfCost(RelOptPlanner planner) {
+    double rowCount = RelMetadataQuery.getRowCount(this);
+
+    final double rightRowCount = right.getRows();
+    final double leftRowCount = left.getRows();
+    if (Double.isInfinite(leftRowCount) || Double.isInfinite(rightRowCount)) {
+      return planner.getCostFactory().makeInfiniteCost();
+    }
+
+    Double restartCount = RelMetadataQuery.getRowCount(getLeft());
+    // RelMetadataQuery.getCumulativeCost(getRight()); does not work for
+    // RelSubset, so we ask planner to cost-estimate right relation
+    RelOptCost rightCost = planner.getCost(getRight());
+    RelOptCost rescanCost =
+        rightCost.multiplyBy(Math.max(1.0, restartCount - 1));
+
+    return planner.getCostFactory().makeCost(
+        rowCount /* generate results */ + leftRowCount /* scan left results */,
+        0, 0).plus(rescanCost);
+  }
+}
+
+// End Correlate.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/rel/core/Correlation.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/core/Correlation.java b/core/src/main/java/org/apache/calcite/rel/core/Correlation.java
deleted file mode 100644
index 7aed265..0000000
--- a/core/src/main/java/org/apache/calcite/rel/core/Correlation.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.rel.core;
-
-/**
- * Describes the necessary parameters for an implementation in order to
- * identify and set dynamic variables
- */
-public class Correlation implements Cloneable, Comparable<Correlation> {
-  private final int id;
-  private final int offset;
-
-  /**
-   * Creates a correlation.
-   *
-   * @param id     Identifier
-   * @param offset Offset
-   */
-  public Correlation(int id, int offset) {
-    this.id = id;
-    this.offset = offset;
-  }
-
-  /**
-   * Returns the identifier.
-   *
-   * @return identifier
-   */
-  public int getId() {
-    return id;
-  }
-
-  /**
-   * Returns this correlation's offset.
-   *
-   * @return offset
-   */
-  public int getOffset() {
-    return offset;
-  }
-
-  public String toString() {
-    return "var" + id + "=offset" + offset;
-  }
-
-  public int compareTo(Correlation other) {
-    return id - other.id;
-  }
-
-  @Override public int hashCode() {
-    return id;
-  }
-
-  @Override public boolean equals(Object obj) {
-    return this == obj
-        || obj instanceof Correlation
-        && this.id == ((Correlation) obj).id
-        && this.offset == ((Correlation) obj).offset;
-  }
-}
-
-// End Correlation.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/rel/core/CorrelationId.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/core/CorrelationId.java b/core/src/main/java/org/apache/calcite/rel/core/CorrelationId.java
new file mode 100644
index 0000000..a47bf52
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/core/CorrelationId.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.core;
+
+/**
+ * Describes the necessary parameters for an implementation in order to
+ * identify and set dynamic variables
+ */
+public class CorrelationId implements Cloneable, Comparable<CorrelationId> {
+  private static final String CORREL_PREFIX = "$cor";
+
+  private final int id;
+  private final String name;
+
+  /**
+   * Creates a correlation identifier.
+   * This is a type-safe wrapper over int.
+   *
+   * @param id     Identifier
+   */
+  public CorrelationId(int id) {
+    this.id = id;
+    this.name = CORREL_PREFIX + id;
+  }
+
+  /**
+   * Creates a correlation identifier.
+   * This is a type-safe wrapper over int.
+   *
+   * @param name     variable name
+   */
+  public CorrelationId(String name) {
+    assert name != null && name.startsWith(CORREL_PREFIX)
+        : "Correlation name should start with " + CORREL_PREFIX
+        + " actual name is " + name;
+    this.id = Integer.parseInt(name.substring(CORREL_PREFIX.length()));
+    this.name = name;
+  }
+
+  /**
+   * Returns the identifier.
+   *
+   * @return identifier
+   */
+  public int getId() {
+    return id;
+  }
+
+  /**
+   * Returns the preffered name of the variable.
+   *
+   * @return name
+   */
+  public String getName() {
+    return name;
+  }
+
+  public String toString() {
+    return name;
+  }
+
+  public int compareTo(CorrelationId other) {
+    return id - other.id;
+  }
+
+  @Override public int hashCode() {
+    return id;
+  }
+
+  @Override public boolean equals(Object obj) {
+    return this == obj
+        || obj instanceof CorrelationId
+        && this.id == ((CorrelationId) obj).id;
+  }
+}
+
+// End Correlation.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/rel/core/Correlator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/core/Correlator.java b/core/src/main/java/org/apache/calcite/rel/core/Correlator.java
deleted file mode 100644
index a80d324..0000000
--- a/core/src/main/java/org/apache/calcite/rel/core/Correlator.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.rel.core;
-
-import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelInput;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelShuttle;
-import org.apache.calcite.rel.RelWriter;
-import org.apache.calcite.rex.RexNode;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A relational operator that performs nested-loop joins.
- *
- * <p>It behaves like a kind of {@link Join},
- * but works by setting variables in its environment and restarting its
- * right-hand input.
- *
- * <p>A Correlator is used to represent a correlated query. One
- * implementation strategy is to de-correlate the expression.
- *
- * @see Correlation
- */
-public final class Correlator extends Join {
-  //~ Instance fields --------------------------------------------------------
-
-  protected final ImmutableList<Correlation> correlations;
-
-  //~ Constructors -----------------------------------------------------------
-
-  /**
-   * Creates a Correlator.
-   *
-   * @param cluster      cluster this relational expression belongs to
-   * @param left         left input relational expression
-   * @param right        right input relational expression
-   * @param joinCond     join condition
-   * @param correlations set of expressions to set as variables each time a
-   *                     row arrives from the left input
-   * @param joinType     join type
-   */
-  public Correlator(
-      RelOptCluster cluster,
-      RelNode left,
-      RelNode right,
-      RexNode joinCond,
-      List<Correlation> correlations,
-      JoinRelType joinType) {
-    super(
-        cluster,
-        cluster.traitSetOf(Convention.NONE),
-        left,
-        right,
-        joinCond,
-        joinType,
-        ImmutableSet.<String>of());
-    this.correlations = ImmutableList.copyOf(correlations);
-    assert (joinType == JoinRelType.LEFT)
-        || (joinType == JoinRelType.INNER);
-  }
-
-  /**
-   * Creates a Correlator with no join condition.
-   *
-   * @param cluster      Cluster that this relational expression belongs to
-   * @param left         left input relational expression
-   * @param right        right input relational expression
-   * @param correlations set of expressions to set as variables each time a
-   *                     row arrives from the left input
-   * @param joinType     join type
-   */
-  public Correlator(
-      RelOptCluster cluster,
-      RelNode left,
-      RelNode right,
-      List<Correlation> correlations,
-      JoinRelType joinType) {
-    this(
-        cluster,
-        left,
-        right,
-        cluster.getRexBuilder().makeLiteral(true),
-        correlations,
-        joinType);
-  }
-
-  /**
-   * Creates a Correlator by parsing serialized output.
-   */
-  public Correlator(RelInput input) {
-    this(
-        input.getCluster(), input.getInputs().get(0),
-        input.getInputs().get(1), getCorrelations(input),
-        input.getEnum("joinType", JoinRelType.class));
-  }
-
-  private static List<Correlation> getCorrelations(RelInput input) {
-    final List<Correlation> list = new ArrayList<Correlation>();
-    //noinspection unchecked
-    final List<Map<String, Object>> correlations1 =
-        (List<Map<String, Object>>) input.get("correlations");
-    for (Map<String, Object> correlation : correlations1) {
-      list.add(
-          new Correlation(
-              (Integer) correlation.get("correlation"),
-              (Integer) correlation.get("offset")));
-    }
-    return list;
-  }
-
-  //~ Methods ----------------------------------------------------------------
-
-  @Override public Correlator copy(RelTraitSet traitSet, RexNode conditionExpr,
-      RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) {
-    assert traitSet.containsIfApplicable(Convention.NONE);
-    return new Correlator(
-        getCluster(),
-        left,
-        right,
-        correlations,
-        this.joinType);
-  }
-
-  @Override public RelNode accept(RelShuttle shuttle) {
-    return shuttle.visit(this);
-  }
-
-  public RelWriter explainTerms(RelWriter pw) {
-    return super.explainTerms(pw)
-        .item("correlations", correlations);
-  }
-
-  /**
-   * Returns the correlating expressions.
-   *
-   * @return correlating expressions
-   */
-  public List<Correlation> getCorrelations() {
-    return correlations;
-  }
-}
-
-// End Correlator.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/rel/core/Join.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/core/Join.java b/core/src/main/java/org/apache/calcite/rel/core/Join.java
index c3b2f11..ae73eb8 100644
--- a/core/src/main/java/org/apache/calcite/rel/core/Join.java
+++ b/core/src/main/java/org/apache/calcite/rel/core/Join.java
@@ -20,9 +20,8 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.AbstractRelNode;
+import org.apache.calcite.rel.BiRel;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelVisitor;
 import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.type.RelDataType;
@@ -30,9 +29,7 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexChecker;
 import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.runtime.FlatLists;
 import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.util.Util;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
@@ -51,12 +48,10 @@ import java.util.Set;
  * The set of output rows is a subset of the cartesian product of the two
  * inputs; precisely which subset depends on the join condition.
  */
-public abstract class Join extends AbstractRelNode {
+public abstract class Join extends BiRel {
   //~ Instance fields --------------------------------------------------------
 
   protected final RexNode condition;
-  protected RelNode left;
-  protected RelNode right;
   protected final ImmutableSet<String> variablesStopped;
 
   /**
@@ -88,9 +83,7 @@ public abstract class Join extends AbstractRelNode {
       RexNode condition,
       JoinRelType joinType,
       Set<String> variablesStopped) {
-    super(cluster, traits);
-    this.left = left;
-    this.right = right;
+    super(cluster, traits, left, right);
     this.condition = condition;
     this.variablesStopped = ImmutableSet.copyOf(variablesStopped);
     assert joinType != null;
@@ -108,22 +101,10 @@ public abstract class Join extends AbstractRelNode {
     return condition;
   }
 
-  public List<RelNode> getInputs() {
-    return FlatLists.of(left, right);
-  }
-
   public JoinRelType getJoinType() {
     return joinType;
   }
 
-  public RelNode getLeft() {
-    return left;
-  }
-
-  public RelNode getRight() {
-    return right;
-  }
-
   // TODO: enable
   public boolean isValid_(boolean fail) {
     if (!super.isValid(fail)) {
@@ -192,15 +173,8 @@ public abstract class Join extends AbstractRelNode {
     return variablesStopped;
   }
 
-  public void childrenAccept(RelVisitor visitor) {
-    visitor.visit(left, 0, this);
-    visitor.visit(right, 1, this);
-  }
-
   public RelWriter explainTerms(RelWriter pw) {
     return super.explainTerms(pw)
-        .input("left", left)
-        .input("right", right)
         .item("condition", condition)
         .item("joinType", joinType.name().toLowerCase())
         .itemIf(
@@ -209,21 +183,6 @@ public abstract class Join extends AbstractRelNode {
             !getSystemFieldList().isEmpty());
   }
 
-  public void replaceInput(
-      int ordinalInParent,
-      RelNode p) {
-    switch (ordinalInParent) {
-    case 0:
-      this.left = p;
-      break;
-    case 1:
-      this.right = p;
-      break;
-    default:
-      throw Util.newInternal();
-    }
-  }
-
   protected RelDataType deriveRowType() {
     return deriveJoinRowType(
         left.getRowType(),

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/rel/core/Uncollect.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/core/Uncollect.java b/core/src/main/java/org/apache/calcite/rel/core/Uncollect.java
index b31700b..2e76332 100644
--- a/core/src/main/java/org/apache/calcite/rel/core/Uncollect.java
+++ b/core/src/main/java/org/apache/calcite/rel/core/Uncollect.java
@@ -33,7 +33,8 @@ import java.util.List;
  * relation.
  *
  * <p>Like its inverse operation {@link Collect}, Uncollect is generally
- * invoked in a nested loop, driven by {@link Correlator} or similar.
+ * invoked in a nested loop, driven by
+ * {@link org.apache.calcite.rel.logical.LogicalCorrelate} or similar.
  */
 public class Uncollect extends SingleRel {
   //~ Constructors -----------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/rel/externalize/RelJson.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/externalize/RelJson.java b/core/src/main/java/org/apache/calcite/rel/externalize/RelJson.java
index ff6c570..fb85247 100644
--- a/core/src/main/java/org/apache/calcite/rel/externalize/RelJson.java
+++ b/core/src/main/java/org/apache/calcite/rel/externalize/RelJson.java
@@ -23,7 +23,7 @@ import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Correlation;
+import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
@@ -225,8 +225,8 @@ public class RelJson {
       return value;
     } else if (value instanceof RexNode) {
       return toJson((RexNode) value);
-    } else if (value instanceof Correlation) {
-      return toJson((Correlation) value);
+    } else if (value instanceof CorrelationId) {
+      return toJson((CorrelationId) value);
     } else if (value instanceof List) {
       final List<Object> list = jsonBuilder.list();
       for (Object o : (List) value) {
@@ -281,11 +281,8 @@ public class RelJson {
     return map;
   }
 
-  private Object toJson(Correlation node) {
-    final Map<String, Object> map = jsonBuilder.map();
-    map.put("correlation", node.getId());
-    map.put("offset", node.getOffset());
-    return map;
+  private Object toJson(CorrelationId node) {
+    return node.getId();
   }
 
   private Object toJson(RexNode node) {


[2/3] incubator-calcite git commit: [CALCITE-483][CALCITE-489] Update Correlate mechanics and implement EnumerableCorrelate (aka nested loops join)

Posted by vl...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/rel/logical/LogicalCorrelate.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/logical/LogicalCorrelate.java b/core/src/main/java/org/apache/calcite/rel/logical/LogicalCorrelate.java
new file mode 100644
index 0000000..e5d3bd7
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/logical/LogicalCorrelate.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.logical;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttle;
+import org.apache.calcite.rel.core.Correlate;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.sql.SemiJoinType;
+import org.apache.calcite.util.ImmutableBitSet;
+
+/**
+ * A relational operator that performs nested-loop joins.
+ *
+ * <p>It behaves like a kind of {@link org.apache.calcite.rel.core.Join},
+ * but works by setting variables in its environment and restarting its
+ * right-hand input.
+ *
+ * <p>A LogicalCorrelate is used to represent a correlated query. One
+ * implementation strategy is to de-correlate the expression.
+ *
+ * @see org.apache.calcite.rel.core.CorrelationId
+ */
+public final class LogicalCorrelate extends Correlate {
+  //~ Instance fields --------------------------------------------------------
+
+  //~ Constructors -----------------------------------------------------------
+
+  /**
+   * Creates a LogicalCorrelate.
+   * @param cluster      cluster this relational expression belongs to
+   * @param left         left input relational expression
+   * @param right        right input relational expression
+   * @param correlationId variable name for the row of left input
+   * @param requiredColumns
+   * @param joinType     join type
+   */
+  public LogicalCorrelate(
+      RelOptCluster cluster,
+      RelNode left,
+      RelNode right,
+      CorrelationId correlationId,
+      ImmutableBitSet requiredColumns, SemiJoinType joinType) {
+    super(
+        cluster,
+        cluster.traitSetOf(Convention.NONE),
+        left,
+        right,
+        correlationId,
+        requiredColumns,
+        joinType);
+  }
+
+  /**
+   * Creates a LogicalCorrelate by parsing serialized output.
+   */
+  public LogicalCorrelate(RelInput input) {
+    this(
+        input.getCluster(), input.getInputs().get(0),
+        input.getInputs().get(1),
+        new CorrelationId((Integer) input.get("correlationId")),
+        input.getBitSet("requiredColumns"),
+        input.getEnum("joinType", SemiJoinType.class));
+  }
+
+  //~ Methods ----------------------------------------------------------------
+
+  @Override public LogicalCorrelate copy(RelTraitSet traitSet,
+      RelNode left, RelNode right, CorrelationId correlationId,
+      ImmutableBitSet requiredColumns, SemiJoinType joinType) {
+    assert traitSet.containsIfApplicable(Convention.NONE);
+    return new LogicalCorrelate(
+        getCluster(),
+        left,
+        right,
+        correlationId,
+        requiredColumns,
+        joinType);
+  }
+
+  @Override public RelNode accept(RelShuttle shuttle) {
+    return shuttle.visit(this);
+  }
+}
+
+// End LogicalCorrelate.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/rel/metadata/RelMdColumnUniqueness.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdColumnUniqueness.java b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdColumnUniqueness.java
index 6a6fab6..5128884 100644
--- a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdColumnUniqueness.java
+++ b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdColumnUniqueness.java
@@ -18,7 +18,7 @@ package org.apache.calcite.rel.metadata;
 
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
-import org.apache.calcite.rel.core.Correlator;
+import org.apache.calcite.rel.core.Correlate;
 import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinInfo;
@@ -72,7 +72,7 @@ public class RelMdColumnUniqueness {
   }
 
   public Boolean areColumnsUnique(
-      Correlator rel,
+      Correlate rel,
       ImmutableBitSet columns,
       boolean ignoreNulls) {
     return RelMetadataQuery.areColumnsUnique(

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/rel/metadata/RelMdUniqueKeys.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdUniqueKeys.java b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdUniqueKeys.java
index a84a94e..d6a0275 100644
--- a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdUniqueKeys.java
+++ b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdUniqueKeys.java
@@ -18,7 +18,7 @@ package org.apache.calcite.rel.metadata;
 
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
-import org.apache.calcite.rel.core.Correlator;
+import org.apache.calcite.rel.core.Correlate;
 import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinInfo;
@@ -62,7 +62,7 @@ public class RelMdUniqueKeys {
     return RelMetadataQuery.getUniqueKeys(rel.getInput(), ignoreNulls);
   }
 
-  public Set<ImmutableBitSet> getUniqueKeys(Correlator rel,
+  public Set<ImmutableBitSet> getUniqueKeys(Correlate rel,
       boolean ignoreNulls) {
     return RelMetadataQuery.getUniqueKeys(rel.getLeft(), ignoreNulls);
   }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/rel/rules/JoinToCorrelateRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/JoinToCorrelateRule.java b/core/src/main/java/org/apache/calcite/rel/rules/JoinToCorrelateRule.java
new file mode 100644
index 0000000..bcdd0aa
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/rules/JoinToCorrelateRule.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.SemiJoinType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Util;
+
+/**
+ * Rule that converts a {@link org.apache.calcite.rel.logical.LogicalJoin}
+ * into a {@link org.apache.calcite.rel.logical.LogicalCorrelate}, which can
+ * then be implemented using nested loops.
+ *
+ * <p>For example,</p>
+ *
+ * <blockquote><code>select * from emp join dept on emp.deptno =
+ * dept.deptno</code></blockquote>
+ *
+ * <p>becomes a Correlator which restarts LogicalTableScan("DEPT") for each
+ * row read from LogicalTableScan("EMP").</p>
+ *
+ * <p>This rule is not applicable if for certain types of outer join. For
+ * example,</p>
+ *
+ * <blockquote><code>select * from emp right join dept on emp.deptno =
+ * dept.deptno</code></blockquote>
+ *
+ * <p>would require emitting a NULL emp row if a certain department contained no
+ * employees, and Correlator cannot do that.</p>
+ */
+public class JoinToCorrelateRule extends RelOptRule {
+  //~ Static fields/initializers ---------------------------------------------
+
+  public static final JoinToCorrelateRule INSTANCE =
+      new JoinToCorrelateRule(RelFactories.DEFAULT_FILTER_FACTORY);
+
+  protected final RelFactories.FilterFactory filterFactory;
+
+  //~ Constructors -----------------------------------------------------------
+
+  /**
+   * Private constructor; use singleton {@link #INSTANCE}.
+   */
+  protected JoinToCorrelateRule(RelFactories.FilterFactory filterFactory) {
+    super(operand(LogicalJoin.class, any()));
+    this.filterFactory = filterFactory;
+    assert filterFactory != null : "Filter factory should not be null";
+  }
+
+  //~ Methods ----------------------------------------------------------------
+
+  public boolean matches(RelOptRuleCall call) {
+    LogicalJoin join = call.rel(0);
+    switch (join.getJoinType()) {
+    case INNER:
+    case LEFT:
+      return true;
+    case FULL:
+    case RIGHT:
+      return false;
+    default:
+      throw Util.unexpected(join.getJoinType());
+    }
+  }
+
+  public void onMatch(RelOptRuleCall call) {
+    assert matches(call);
+    final LogicalJoin join = call.rel(0);
+    RelNode right = join.getRight();
+    final RelNode left = join.getLeft();
+    final int leftFieldCount = left.getRowType().getFieldCount();
+    final RelOptCluster cluster = join.getCluster();
+    final RexBuilder rexBuilder = cluster.getRexBuilder();
+    final String dynInIdStr = cluster.getQuery().createCorrel();
+    final CorrelationId correlationId = new CorrelationId(dynInIdStr);
+    final RexNode corrVar =
+        rexBuilder.makeCorrel(left.getRowType(), correlationId.getName());
+    final ImmutableBitSet.Builder requiredColumns = ImmutableBitSet.builder();
+    RexNode joinCondition = join.getCondition();
+
+    // Replace all references of left input with FieldAccess(corrVar, field)
+    joinCondition = joinCondition.accept(new RexShuttle() {
+      @Override
+      public RexNode visitInputRef(RexInputRef input) {
+        int field = input.getIndex();
+        if (field >= leftFieldCount) {
+          return rexBuilder.makeInputRef(input.getType(),
+              input.getIndex() - leftFieldCount);
+        }
+        requiredColumns.set(field);
+        return rexBuilder.makeFieldAccess(corrVar, field);
+      }
+    });
+
+    joinCondition = RexUtil.flatten(rexBuilder, joinCondition);
+    final RelNode filteredRight =
+        RelOptUtil.createFilter(right, joinCondition, filterFactory);
+    RelNode newRel =
+        new LogicalCorrelate(
+            join.getCluster(),
+            left,
+            filteredRight,
+            correlationId,
+            requiredColumns.build(),
+            SemiJoinType.of(join.getJoinType()));
+    call.transformTo(newRel);
+  }
+}
+
+// End JoinToCorrelateRule.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/rel/rules/JoinToCorrelatorRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/JoinToCorrelatorRule.java b/core/src/main/java/org/apache/calcite/rel/rules/JoinToCorrelatorRule.java
deleted file mode 100644
index 219b662..0000000
--- a/core/src/main/java/org/apache/calcite/rel/rules/JoinToCorrelatorRule.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.rel.rules;
-
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptQuery;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Correlation;
-import org.apache.calcite.rel.core.Correlator;
-import org.apache.calcite.rel.core.JoinInfo;
-import org.apache.calcite.rel.logical.LogicalJoin;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.util.Util;
-import org.apache.calcite.util.mapping.IntPair;
-
-import com.google.common.collect.Lists;
-
-import java.util.List;
-
-/**
- * Rule that converts a {@link org.apache.calcite.rel.logical.LogicalJoin}
- * into a {@link org.apache.calcite.rel.core.Correlator}, which can
- * then be implemented using nested loops.
- *
- * <p>For example,</p>
- *
- * <blockquote><code>select * from emp join dept on emp.deptno =
- * dept.deptno</code></blockquote>
- *
- * <p>becomes a Correlator which restarts LogicalTableScan("DEPT") for each
- * row read from LogicalTableScan("EMP").</p>
- *
- * <p>This rule is not applicable if for certain types of outer join. For
- * example,</p>
- *
- * <blockquote><code>select * from emp right join dept on emp.deptno =
- * dept.deptno</code></blockquote>
- *
- * <p>would require emitting a NULL emp row if a certain department contained no
- * employees, and Correlator cannot do that.</p>
- */
-public class JoinToCorrelatorRule extends RelOptRule {
-  //~ Static fields/initializers ---------------------------------------------
-
-  public static final JoinToCorrelatorRule INSTANCE =
-      new JoinToCorrelatorRule();
-
-  //~ Constructors -----------------------------------------------------------
-
-  /**
-   * Private constructor; use singleton {@link #INSTANCE}.
-   */
-  private JoinToCorrelatorRule() {
-    super(operand(LogicalJoin.class, any()));
-  }
-
-  //~ Methods ----------------------------------------------------------------
-
-  public boolean matches(RelOptRuleCall call) {
-    LogicalJoin join = call.rel(0);
-    switch (join.getJoinType()) {
-    case INNER:
-    case LEFT:
-      return true;
-    case FULL:
-    case RIGHT:
-      return false;
-    default:
-      throw Util.unexpected(join.getJoinType());
-    }
-  }
-
-  public void onMatch(RelOptRuleCall call) {
-    assert matches(call);
-    final LogicalJoin join = call.rel(0);
-    RelNode right = join.getRight();
-    final RelNode left = join.getLeft();
-    final JoinInfo joinInfo = join.analyzeCondition();
-    final List<Correlation> correlationList = Lists.newArrayList();
-    final RelOptCluster cluster = join.getCluster();
-    final RexBuilder rexBuilder = cluster.getRexBuilder();
-    final List<RexNode> conditions = Lists.newArrayList();
-    for (IntPair p : joinInfo.pairs()) {
-      final String dynInIdStr = cluster.getQuery().createCorrel();
-      final int dynInId = RelOptQuery.getCorrelOrdinal(dynInIdStr);
-
-      // Create correlation to say 'each row, set variable #id
-      // to the value of column #leftKey'.
-      correlationList.add(new Correlation(dynInId, p.source));
-      conditions.add(
-          rexBuilder.makeCall(SqlStdOperatorTable.EQUALS,
-              rexBuilder.makeInputRef(right, p.target),
-              rexBuilder.makeCorrel(
-                  left.getRowType().getFieldList().get(p.source).getType(),
-                  dynInIdStr)));
-    }
-    final RelNode filteredRight = RelOptUtil.createFilter(right, conditions);
-    RelNode newRel =
-        new Correlator(
-            join.getCluster(),
-            left,
-            filteredRight,
-            joinInfo.getRemaining(join.getCluster().getRexBuilder()),
-            correlationList,
-            join.getJoinType());
-    call.transformTo(newRel);
-  }
-}
-
-// End JoinToCorrelatorRule.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/rel/rules/ProjectJoinTransposeRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/ProjectJoinTransposeRule.java b/core/src/main/java/org/apache/calcite/rel/rules/ProjectJoinTransposeRule.java
index 5379110..6c275a2 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/ProjectJoinTransposeRule.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/ProjectJoinTransposeRule.java
@@ -20,6 +20,7 @@ import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.SemiJoin;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexNode;
@@ -67,6 +68,9 @@ public class ProjectJoinTransposeRule extends RelOptRule {
     LogicalProject origProj = call.rel(0);
     final Join join = call.rel(1);
 
+    if (join instanceof SemiJoin) {
+      return; // TODO: support SemiJoin
+    }
     // locate all fields referenced in the projection and join condition;
     // determine which inputs are referenced in the projection and
     // join condition; if all fields are being referenced and there are no

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/PushProjector.java b/core/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
index cfaf740..867cf0e 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
@@ -20,6 +20,7 @@ import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.SemiJoin;
 import org.apache.calcite.rel.core.SetOp;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.type.RelDataTypeField;
@@ -213,7 +214,7 @@ public class PushProjector {
       List<RelDataTypeField> rightFields =
           joinRel.getRight().getRowType().getFieldList();
       nFields = leftFields.size();
-      nFieldsRight = rightFields.size();
+      nFieldsRight = childRel instanceof SemiJoin ? 0 : rightFields.size();
       nSysFields = joinRel.getSystemFieldList().size();
       childBitmap =
           ImmutableBitSet.range(nSysFields, nFields + nSysFields);

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/rex/RexExecutorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rex/RexExecutorImpl.java b/core/src/main/java/org/apache/calcite/rex/RexExecutorImpl.java
index c795c23..9825367 100644
--- a/core/src/main/java/org/apache/calcite/rex/RexExecutorImpl.java
+++ b/core/src/main/java/org/apache/calcite/rex/RexExecutorImpl.java
@@ -77,7 +77,7 @@ public class RexExecutorImpl implements RelOptPlanner.Executor {
             Expressions.convert_(root0_, DataContext.class)));
     final List<Expression> expressions =
         RexToLixTranslator.translateProjects(programBuilder.getProgram(),
-        javaTypeFactory, blockBuilder, null, getter);
+        javaTypeFactory, blockBuilder, null, getter, null);
     blockBuilder.add(
         Expressions.return_(null,
             Expressions.newArrayInit(Object[].class, expressions)));

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/sql/SemiJoinType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/SemiJoinType.java b/core/src/main/java/org/apache/calcite/sql/SemiJoinType.java
new file mode 100644
index 0000000..50b3659
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/sql/SemiJoinType.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.sql;
+
+import org.apache.calcite.linq4j.CorrelateJoinType;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+/**
+ * Enumeration representing different join types used in correlation
+ * relations.
+ */
+public enum SemiJoinType implements SqlLiteral.SqlSymbol {
+  /**
+   * Inner join
+   */
+  INNER,
+
+  /**
+   * Left-outer join
+   */
+  LEFT,
+
+  /**
+   * Semi-join
+   * <p>Similar to from A ... where a in (select b from B ...)</p>
+   */
+  SEMI,
+
+  /**
+   * Anti-join
+   * <p>Similar to from A ... where a NOT in (select b from B ...)</p>
+   * <p>Note: if B.b is nullable and B has nulls, no rows must be returned</p>
+   */
+  ANTI;
+
+  /**
+   * Creates a parse-tree node representing an occurrence of this
+   * condition type keyword at a particular position in the parsed
+   * text.
+   */
+  public SqlLiteral symbol(SqlParserPos pos) {
+    return SqlLiteral.createSymbol(this, pos);
+  }
+
+  public static SemiJoinType of(JoinRelType joinType) {
+    switch (joinType) {
+    case INNER:
+      return INNER;
+    case LEFT:
+      return LEFT;
+    }
+    throw new IllegalArgumentException(
+        "Unsupported join type for semi-join " + joinType);
+  }
+
+  public JoinRelType toJoinType() {
+    switch (this) {
+    case INNER:
+      return JoinRelType.INNER;
+    case LEFT:
+      return JoinRelType.LEFT;
+    }
+    throw new IllegalStateException(
+        "Unable to convert " + this + " to JoinRelType");
+  }
+
+  public CorrelateJoinType toLinq4j() {
+    switch (this) {
+    case INNER:
+      return CorrelateJoinType.INNER;
+    case LEFT:
+      return CorrelateJoinType.LEFT;
+    case SEMI:
+      return CorrelateJoinType.SEMI;
+    case ANTI:
+      return CorrelateJoinType.ANTI;
+    }
+    throw new IllegalStateException(
+        "Unable to convert " + this + " to JoinRelType");
+  }
+
+  public boolean returnsJustFirstInput() {
+    switch (this) {
+    case INNER:
+    case LEFT:
+      return false;
+    case SEMI:
+    case ANTI:
+      return true;
+    }
+    throw new IllegalStateException(
+        "Unable to convert " + this + " to JoinRelType");
+  }
+}
+
+// End SemiJoinType.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/sql2rel/DeduplicateCorrelateVariables.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql2rel/DeduplicateCorrelateVariables.java b/core/src/main/java/org/apache/calcite/sql2rel/DeduplicateCorrelateVariables.java
new file mode 100644
index 0000000..206ed2b
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/sql2rel/DeduplicateCorrelateVariables.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.sql2rel;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCorrelVariable;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Rewrites relations to ensure the same correlation is referenced by the same
+ * correlation variable.
+ */
+public class DeduplicateCorrelateVariables extends RelShuttleImpl {
+  private final RexShuttle dedupRex;
+
+  /**
+   * Replaces alternative names of correlation variable to its canonical name.
+   */
+  private static class DeduplicateCorrelateVariablesShuttle extends RexShuttle {
+    private final RexBuilder builder;
+    private final String canonical;
+    private final Set<String> altNames;
+
+    public DeduplicateCorrelateVariablesShuttle(RexBuilder builder,
+        String canonical, Set<String> altNames) {
+      this.builder = builder;
+      this.canonical = canonical;
+      this.altNames = altNames;
+    }
+
+    @Override
+    public RexNode visitCorrelVariable(RexCorrelVariable variable) {
+      if (!altNames.contains(variable.getName())) {
+        return variable;
+      }
+
+      return builder.makeCorrel(variable.getType(), canonical);
+    }
+  }
+
+  public DeduplicateCorrelateVariables(RexBuilder builder,
+      String canonical, Set<String> altNames) {
+    dedupRex = new DeduplicateCorrelateVariablesShuttle(builder,
+        canonical, altNames);
+  }
+
+  @Override
+  public RelNode visit(LogicalFilter filter) {
+    LogicalFilter newFilter = (LogicalFilter) super.visit(filter);
+    RexNode condition = filter.getCondition();
+    RexNode newCondition = condition.accept(dedupRex);
+    if (condition != newCondition) {
+      return newFilter.copy(newFilter.getTraitSet(), newFilter.getInput(),
+          newCondition);
+    }
+    return newFilter;
+  }
+
+  @Override
+  public RelNode visit(LogicalProject project) {
+    LogicalProject project2 = (LogicalProject) super.visit(project);
+    List<RexNode> childExps = project2.getChildExps();
+    List<RexNode> newExps = dedupRex.apply(childExps);
+    if (childExps != newExps) {
+      return project2.copy(project2.getTraitSet(), project2.getInput(),
+          newExps, project2.getRowType());
+    }
+    return project2;
+  }
+
+  @Override
+  public RelNode visit(LogicalJoin join) {
+    LogicalJoin join2 = (LogicalJoin) super.visit(join);
+    RexNode condition = join2.getCondition();
+    RexNode newCondition = condition.accept(dedupRex);
+    if (condition != newCondition) {
+      return join2.copy(join2.getTraitSet(), newCondition, join2.getLeft(),
+          join2.getRight(), join2.getJoinType(), join2.isSemiJoinDone());
+    }
+    return join2;
+  }
+}
+
+// End DeduplicateCorrelateVariables.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java b/core/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
index 5f2ef57..bff404f 100644
--- a/core/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
+++ b/core/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
@@ -28,18 +28,18 @@ import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.hep.HepPlanner;
 import org.apache.calcite.plan.hep.HepProgram;
 import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.BiRel;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelShuttleImpl;
 import org.apache.calcite.rel.RelVisitor;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Correlation;
-import org.apache.calcite.rel.core.Correlator;
-import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalProject;
@@ -149,7 +149,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
   private final Map<RelNode, Map<Integer, Integer>>
   mapNewRelToMapOldToNewOutputPos = Maps.newHashMap();
 
-  private final HashSet<Correlator> generatedCorRels = Sets.newHashSet();
+  private final HashSet<LogicalCorrelate> generatedCorRels = Sets.newHashSet();
 
   //~ Constructors -----------------------------------------------------------
 
@@ -173,7 +173,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
    * @param rootRel Root node of the query
    *
    * @return Equivalent query with all
-   * {@link org.apache.calcite.rel.core.Correlator} instances removed
+   * {@link org.apache.calcite.rel.logical.LogicalCorrelate} instances removed
    */
   public static RelNode decorrelateQuery(RelNode rootRel) {
     final CorelMap corelMap = CorelMap.build(rootRel);
@@ -206,7 +206,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
     return newRootRel;
   }
 
-  private void setCurrent(RelNode root, Correlator corRel) {
+  private void setCurrent(RelNode root, LogicalCorrelate corRel) {
     currentRel = corRel;
     if (corRel != null) {
       cm = CorelMap.build(Util.first(root, corRel));
@@ -249,17 +249,16 @@ public class RelDecorrelator implements ReflectiveVisitor {
           cm.mapRefRelToCorVar.putAll(newNode,
               cm.mapRefRelToCorVar.get(oldNode));
         }
-        if (oldNode instanceof Correlator
-            && newNode instanceof Correlator) {
-          Correlator oldCor = (Correlator) oldNode;
-          for (Correlation c : oldCor.getCorrelations()) {
-            if (cm.mapCorVarToCorRel.get(c) == oldNode) {
-              cm.mapCorVarToCorRel.put(c, (Correlator) newNode);
-            }
+        if (oldNode instanceof LogicalCorrelate
+            && newNode instanceof LogicalCorrelate) {
+          LogicalCorrelate oldCor = (LogicalCorrelate) oldNode;
+          CorrelationId c = oldCor.getCorrelationId();
+          if (cm.mapCorVarToCorRel.get(c) == oldNode) {
+            cm.mapCorVarToCorRel.put(c, (LogicalCorrelate) newNode);
           }
 
           if (generatedCorRels.contains(oldNode)) {
-            generatedCorRels.add((Correlator) newNode);
+            generatedCorRels.add((LogicalCorrelate) newNode);
           }
         }
         return null;
@@ -716,9 +715,9 @@ public class RelDecorrelator implements ReflectiveVisitor {
     // inputRel provides the definition of a correlated variable.
     // Add to map all the referenced positions(relative to each input rel)
     for (Correlation corVar : correlations) {
-      int oldCorVarOffset = corVar.getOffset();
+      int oldCorVarOffset = corVar.field;
 
-      oldInputRel = cm.mapCorVarToCorRel.get(corVar).getInput(0);
+      oldInputRel = cm.mapCorVarToCorRel.get(corVar.corr).getInput(0);
       assert oldInputRel != null;
       newInputRel = mapOldToNewRel.get(oldInputRel);
       assert newInputRel != null;
@@ -753,7 +752,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
     Set<RelNode> joinedInputRelSet = Sets.newHashSet();
 
     for (Correlation corVar : correlations) {
-      oldInputRel = cm.mapCorVarToCorRel.get(corVar).getInput(0);
+      oldInputRel = cm.mapCorVarToCorRel.get(corVar.corr).getInput(0);
       assert oldInputRel != null;
       newInputRel = mapOldToNewRel.get(oldInputRel);
       assert newInputRel != null;
@@ -795,17 +794,17 @@ public class RelDecorrelator implements ReflectiveVisitor {
       // The first child of a correlatorRel is always the rel defining
       // the correlated variables.
       newInputRel =
-          mapOldToNewRel.get(cm.mapCorVarToCorRel.get(corVar).getInput(0));
+          mapOldToNewRel.get(cm.mapCorVarToCorRel.get(corVar.corr).getInput(0));
       newLocalOutputPosList = mapNewInputRelToOutputPos.get(newInputRel);
 
       Map<Integer, Integer> mapOldToNewOutputPos =
           mapNewRelToMapOldToNewOutputPos.get(newInputRel);
       assert mapOldToNewOutputPos != null;
 
-      newLocalOutputPos = mapOldToNewOutputPos.get(corVar.getOffset());
+      newLocalOutputPos = mapOldToNewOutputPos.get(corVar.field);
 
       // newOutputPos is the index of the cor var in the referenced
-      // position list plus the offset of referenced postition list of
+      // position list plus the offset of referenced position list of
       // each newInputRel.
       newOutputPos =
           newLocalOutputPosList.indexOf(newLocalOutputPos)
@@ -948,7 +947,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
    *
    * @param rel Correlator
    */
-  public void decorrelateRel(Correlator rel) {
+  public void decorrelateRel(LogicalCorrelate rel) {
     //
     // Rewrite logic:
     //
@@ -989,13 +988,13 @@ public class RelDecorrelator implements ReflectiveVisitor {
     SortedMap<Correlation, Integer> mapCorVarToOutputPos =
         rightChildMapCorVarToOutputPos;
 
-    assert rel.getCorrelations().size()
+    assert rel.getRequiredColumns().cardinality()
         <= rightChildMapCorVarToOutputPos.keySet().size();
 
     // Change correlator rel into a join.
     // Join all the correlated variables produced by this correlator rel
     // with the values generated and propagated from the right input
-    RexNode condition = rel.getCondition();
+    RexNode condition = rexBuilder.makeLiteral(true);
     final List<RelDataTypeField> newLeftOutput =
         newLeftRel.getRowType().getFieldList();
     int newLeftFieldCount = newLeftOutput.size();
@@ -1005,8 +1004,13 @@ public class RelDecorrelator implements ReflectiveVisitor {
 
     int newLeftPos;
     int newRightPos;
-    for (Correlation corVar : rel.getCorrelations()) {
-      newLeftPos = leftChildMapOldToNewOutputPos.get(corVar.getOffset());
+    for (Map.Entry<Correlation, Integer> rightOutputPos
+        : Lists.newArrayList(rightChildMapCorVarToOutputPos.entrySet())) {
+      Correlation corVar = rightOutputPos.getKey();
+      if (!corVar.corr.equals(rel.getCorrelationId())) {
+        continue;
+      }
+      newLeftPos = leftChildMapOldToNewOutputPos.get(corVar.field);
       newRightPos = rightChildMapCorVarToOutputPos.get(corVar);
       RexNode equi =
           rexBuilder.makeCall(
@@ -1070,7 +1074,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
             newLeftRel,
             newRightRel,
             condition,
-            rel.getJoinType(),
+            rel.getJoinType().toJoinType(),
             variablesStopped);
 
     mapOldToNewRel.put(rel, newRel);
@@ -1288,11 +1292,11 @@ public class RelDecorrelator implements ReflectiveVisitor {
    * @return the subtree with the new LogicalProject at the root
    */
   private RelNode aggregateCorrelatorOutput(
-      Correlator corRel,
+      LogicalCorrelate corRel,
       LogicalProject projRel,
       Set<Integer> isCount) {
     RelNode leftInputRel = corRel.getLeft();
-    JoinRelType joinType = corRel.getJoinType();
+    JoinRelType joinType = corRel.getJoinType().toJoinType();
 
     // now create the new project
     List<Pair<RexNode, String>> newProjects = Lists.newArrayList();
@@ -1335,7 +1339,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
    * @return true if filter and proj only references corVar provided by corRel
    */
   private boolean checkCorVars(
-      Correlator corRel,
+      LogicalCorrelate corRel,
       LogicalProject projRel,
       LogicalFilter filter,
       List<RexFieldAccess> correlatedJoinKeys) {
@@ -1361,7 +1365,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
       corVarInFilter.addAll(cm.mapRefRelToCorVar.get(filter));
 
       for (Correlation corVar : corVarInFilter) {
-        if (cm.mapCorVarToCorRel.get(corVar) != corRel) {
+        if (cm.mapCorVarToCorRel.get(corVar.corr) != corRel) {
           return false;
         }
       }
@@ -1372,7 +1376,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
     // of the corRel.
     if ((projRel != null) && cm.mapRefRelToCorVar.containsKey(projRel)) {
       for (Correlation corVar : cm.mapRefRelToCorVar.get(projRel)) {
-        if (cm.mapCorVarToCorRel.get(corVar) != corRel) {
+        if (cm.mapCorVarToCorRel.get(corVar.corr) != corRel) {
           return false;
         }
       }
@@ -1386,11 +1390,9 @@ public class RelDecorrelator implements ReflectiveVisitor {
    *
    * @param corRel Correlator
    */
-  private void removeCorVarFromTree(Correlator corRel) {
-    for (Correlation c : Lists.newArrayList(cm.mapCorVarToCorRel.keySet())) {
-      if (cm.mapCorVarToCorRel.get(c) == corRel) {
-        cm.mapCorVarToCorRel.remove(c);
-      }
+  private void removeCorVarFromTree(LogicalCorrelate corRel) {
+    if (cm.mapCorVarToCorRel.get(corRel.getCorrelationId()) == corRel) {
+      cm.mapCorVarToCorRel.remove(corRel.getCorrelationId());
     }
   }
 
@@ -1618,7 +1620,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
         // corVar offset should point to the leftInput of currentRel,
         // which is the Correlator.
         RexNode newRexNode =
-            new RexInputRef(corVar.getOffset(), fieldAccess.getType());
+            new RexInputRef(corVar.field, fieldAccess.getType());
 
         if (projectPulledAboveLeftCorrelator
             && (nullIndicator != null)) {
@@ -1637,14 +1639,14 @@ public class RelDecorrelator implements ReflectiveVisitor {
 
     // override RexShuttle
     public RexNode visitInputRef(RexInputRef inputRef) {
-      if ((currentRel != null) && (currentRel instanceof Correlator)) {
+      if ((currentRel != null) && (currentRel instanceof LogicalCorrelate)) {
         // if this rel references corVar
         // and now it needs to be rewritten
         // it must have been pulled above the Correlator
         // replace the input ref to account for the LHS of the
         // Correlator
         int leftInputFieldCount =
-            ((Correlator) currentRel).getLeft().getRowType()
+            ((LogicalCorrelate) currentRel).getLeft().getRowType()
                 .getFieldCount();
         RelDataType newType = inputRef.getType();
 
@@ -1803,7 +1805,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
   private final class RemoveCorrelationForScalarProjectRule extends RelOptRule {
     public RemoveCorrelationForScalarProjectRule() {
       super(
-          operand(Correlator.class,
+          operand(LogicalCorrelate.class,
               operand(RelNode.class, any()),
               operand(LogicalAggregate.class,
                   operand(LogicalProject.class,
@@ -1811,7 +1813,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
     }
 
     public void onMatch(RelOptRuleCall call) {
-      Correlator corRel = call.rel(0);
+      LogicalCorrelate corRel = call.rel(0);
       RelNode leftInputRel = call.rel(1);
       LogicalAggregate aggRel = call.rel(2);
       LogicalProject projRel = call.rel(3);
@@ -1829,8 +1831,11 @@ public class RelDecorrelator implements ReflectiveVisitor {
       //   LogicalAggregate (groupby (0) single_value())
       //     LogicalProject-A (may reference coVar)
       //       RightInputRel
-      JoinRelType joinType = corRel.getJoinType();
-      RexNode joinCond = corRel.getCondition();
+      JoinRelType joinType = corRel.getJoinType().toJoinType();
+
+      // corRel.getCondition was here, however Correlate was updated so it
+      // never includes a join condition. The code was not modified for brevity.
+      RexNode joinCond = rexBuilder.makeLiteral(true);
       if ((joinType != JoinRelType.LEFT)
           || (joinCond != rexBuilder.makeLiteral(true))) {
         return;
@@ -2004,7 +2009,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
       extends RelOptRule {
     public RemoveCorrelationForScalarAggregateRule() {
       super(
-          operand(Correlator.class,
+          operand(LogicalCorrelate.class,
               operand(RelNode.class, any()),
               operand(LogicalProject.class,
                   operand(LogicalAggregate.class, null, Aggregate.IS_SIMPLE,
@@ -2013,7 +2018,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
     }
 
     public void onMatch(RelOptRuleCall call) {
-      Correlator corRel = call.rel(0);
+      LogicalCorrelate corRel = call.rel(0);
       RelNode leftInputRel = call.rel(1);
       LogicalProject aggOutputProjRel = call.rel(2);
       LogicalAggregate aggRel = call.rel(3);
@@ -2040,8 +2045,10 @@ public class RelDecorrelator implements ReflectiveVisitor {
         return;
       }
 
-      JoinRelType joinType = corRel.getJoinType();
-      RexNode joinCond = corRel.getCondition();
+      JoinRelType joinType = corRel.getJoinType().toJoinType();
+      // corRel.getCondition was here, however Correlate was updated so it
+      // never includes a join condition. The code was not modified for brevity.
+      RexNode joinCond = rexBuilder.makeLiteral(true);
       if ((joinType != JoinRelType.LEFT)
           || (joinCond != rexBuilder.makeLiteral(true))) {
         return;
@@ -2389,18 +2396,18 @@ public class RelDecorrelator implements ReflectiveVisitor {
     public AdjustProjectForCountAggregateRule(boolean flavor) {
       super(
           flavor
-              ? operand(Correlator.class,
+              ? operand(LogicalCorrelate.class,
                   operand(RelNode.class, any()),
                       operand(LogicalProject.class,
                           operand(LogicalAggregate.class, any())))
-              : operand(Correlator.class,
+              : operand(LogicalCorrelate.class,
                   operand(RelNode.class, any()),
                       operand(LogicalAggregate.class, any())));
       this.flavor = flavor;
     }
 
     public void onMatch(RelOptRuleCall call) {
-      Correlator corRel = call.rel(0);
+      LogicalCorrelate corRel = call.rel(0);
       RelNode leftInputRel = call.rel(1);
       LogicalProject aggOutputProjRel;
       LogicalAggregate aggRel;
@@ -2428,7 +2435,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
 
     private void onMatch2(
         RelOptRuleCall call,
-        Correlator corRel,
+        LogicalCorrelate corRel,
         RelNode leftInputRel,
         LogicalProject aggOutputProjRel,
         LogicalAggregate aggRel) {
@@ -2457,8 +2464,10 @@ public class RelDecorrelator implements ReflectiveVisitor {
         return;
       }
 
-      JoinRelType joinType = corRel.getJoinType();
-      RexNode joinCond = corRel.getCondition();
+      JoinRelType joinType = corRel.getJoinType().toJoinType();
+      // corRel.getCondition was here, however Correlate was updated so it
+      // never includes a join condition. The code was not modified for brevity.
+      RexNode joinCond = rexBuilder.makeLiteral(true);
       if ((joinType != JoinRelType.LEFT)
           || (joinCond != rexBuilder.makeLiteral(true))) {
         return;
@@ -2489,12 +2498,13 @@ public class RelDecorrelator implements ReflectiveVisitor {
       //     LeftInputRel
       //     LogicalAggregate (groupby (0), agg0(), agg1()...)
       //
-      Correlator newCorRel =
-          new Correlator(
+      LogicalCorrelate newCorRel =
+          new LogicalCorrelate(
               cluster,
               leftInputRel,
               aggRel,
-              corRel.getCorrelations(),
+              corRel.getCorrelationId(),
+              corRel.getRequiredColumns(),
               corRel.getJoinType());
 
       // remember this rel so we don't fire rule on it again
@@ -2506,10 +2516,8 @@ public class RelDecorrelator implements ReflectiveVisitor {
       // need to update the mapCorVarToCorRel Update the output position
       // for the cor vars: only pass on the cor vars that are not used in
       // the join key.
-      for (Correlation c : Lists.newArrayList(cm.mapCorVarToCorRel.keySet())) {
-        if (cm.mapCorVarToCorRel.get(c) == corRel) {
-          cm.mapCorVarToCorRel.put(c, newCorRel);
-        }
+      if (cm.mapCorVarToCorRel.get(corRel.getCorrelationId()) == corRel) {
+        cm.mapCorVarToCorRel.put(corRel.getCorrelationId(), newCorRel);
       }
 
       RelNode newOutputRel =
@@ -2519,7 +2527,39 @@ public class RelDecorrelator implements ReflectiveVisitor {
     }
   }
 
-  /** A map of the locations of {@link org.apache.calcite.rel.core.Correlator}
+  /**
+   * {@code Correlation} here represents a unique reference to a correlation
+   * field.
+   * For instance, if a RelNode references emp.name multiple times, it would
+   * result in multiple {@code Correlation} objects that differ just in
+   * {@link Correlation#uniqueKey}.
+   */
+  static class Correlation
+      implements Comparable<Correlation> {
+    public final int uniqueKey;
+    public final CorrelationId corr;
+    public final int field;
+
+    Correlation(CorrelationId corr, int field, int uniqueKey) {
+      this.corr = corr;
+      this.field = field;
+      this.uniqueKey = uniqueKey;
+    }
+
+    public int compareTo(Correlation o) {
+      int res = corr.compareTo(o.corr);
+      if (res != 0) {
+        return res;
+      }
+      if (field != o.field) {
+        return field - o.field;
+      }
+      return uniqueKey - o.uniqueKey;
+    }
+  }
+
+  /** A map of the locations of
+   * {@link org.apache.calcite.rel.logical.LogicalCorrelate}
    * in a tree of {@link RelNode}s.
    *
    * <p>It is used to drive the decorrelation process.
@@ -2541,12 +2581,12 @@ public class RelDecorrelator implements ReflectiveVisitor {
    * </ol> */
   private static class CorelMap {
     private final Multimap<RelNode, Correlation> mapRefRelToCorVar;
-    private final SortedMap<Correlation, Correlator> mapCorVarToCorRel;
+    private final SortedMap<CorrelationId, LogicalCorrelate> mapCorVarToCorRel;
     private final Map<RexFieldAccess, Correlation> mapFieldAccessToCorVar;
 
     // TODO: create immutable copies of all maps
     private CorelMap(Multimap<RelNode, Correlation> mapRefRelToCorVar,
-        SortedMap<Correlation, Correlator> mapCorVarToCorRel,
+        SortedMap<CorrelationId, LogicalCorrelate> mapCorVarToCorRel,
         Map<RexFieldAccess, Correlation> mapFieldAccessToCorVar) {
       this.mapRefRelToCorVar = mapRefRelToCorVar;
       this.mapCorVarToCorRel = mapCorVarToCorRel;
@@ -2578,7 +2618,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
     /** Creates a CorelMap with given contents. */
     public static CorelMap of(
         SortedSetMultimap<RelNode, Correlation> mapRefRelToCorVar,
-        SortedMap<Correlation, Correlator> mapCorVarToCorRel,
+        SortedMap<CorrelationId, LogicalCorrelate> mapCorVarToCorRel,
         Map<RexFieldAccess, Correlation> mapFieldAccessToCorVar) {
       return new CorelMap(mapRefRelToCorVar, mapCorVarToCorRel,
           mapFieldAccessToCorVar);
@@ -2586,8 +2626,8 @@ public class RelDecorrelator implements ReflectiveVisitor {
 
     /** Creates a CorelMap by iterating over a {@link RelNode} tree. */
     public static CorelMap build(RelNode rel) {
-      final SortedMap<Correlation, Correlator> mapCorVarToCorRel =
-          new TreeMap<Correlation, Correlator>();
+      final SortedMap<CorrelationId, LogicalCorrelate> mapCorVarToCorRel =
+          new TreeMap<CorrelationId, LogicalCorrelate>();
 
       final SortedSetMultimap<RelNode, Correlation> mapRefRelToCorVar =
           Multimaps.newSortedSetMultimap(
@@ -2602,11 +2642,11 @@ public class RelDecorrelator implements ReflectiveVisitor {
       final Map<RexFieldAccess, Correlation> mapFieldAccessToCorVar =
           new HashMap<RexFieldAccess, Correlation>();
 
-      final Map<String, Correlation> mapNameToCorVar = Maps.newHashMap();
-
       final Holder<Integer> offset = Holder.of(0);
+      final int[] corrIdGenerator = new int[1];
       final RelShuttleImpl shuttle = new RelShuttleImpl() {
         @Override public RelNode visit(LogicalJoin join) {
+          join.getCondition().accept(rexVisitor(join));
           return visitJoin(join);
         }
 
@@ -2615,20 +2655,16 @@ public class RelDecorrelator implements ReflectiveVisitor {
           return super.visitChild(parent, i, stripHep(child));
         }
 
-        @Override public RelNode visit(Correlator correlator) {
-          for (Correlation c : correlator.getCorrelations()) {
-            mapNameToCorVar.put("$cor" + c.getId(), c);
-            mapCorVarToCorRel.put(c, correlator);
-          }
-          return visitJoin(correlator);
+        @Override public RelNode visit(LogicalCorrelate correlate) {
+          mapCorVarToCorRel.put(correlate.getCorrelationId(), correlate);
+          return visitJoin(correlate);
         }
 
-        private Join visitJoin(Join join) {
-          join.getCondition().accept(rexVisitor(join));
+        private RelNode visitJoin(BiRel join) {
           final int x = offset.get();
           visitChild(join, 0, join.getLeft());
           offset.set(x + join.getLeft().getRowType().getFieldCount());
-          visitChild(join, 0, join.getRight());
+          visitChild(join, 1, join.getRight());
           offset.set(x);
           return join;
         }
@@ -2652,7 +2688,10 @@ public class RelDecorrelator implements ReflectiveVisitor {
               if (ref instanceof RexCorrelVariable) {
                 final RexCorrelVariable var = (RexCorrelVariable) ref;
                 final Correlation correlation =
-                    mapNameToCorVar.get(var.getName());
+                    new Correlation(
+                        new CorrelationId(var.getName()),
+                        fieldAccess.getField().getIndex(),
+                        corrIdGenerator[0]++);
                 mapFieldAccessToCorVar.put(fieldAccess, correlation);
                 mapRefRelToCorVar.put(rel, correlation);
               }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/sql2rel/RelStructuredTypeFlattener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql2rel/RelStructuredTypeFlattener.java b/core/src/main/java/org/apache/calcite/sql2rel/RelStructuredTypeFlattener.java
index f3557c6..c66b1a2 100644
--- a/core/src/main/java/org/apache/calcite/sql2rel/RelStructuredTypeFlattener.java
+++ b/core/src/main/java/org/apache/calcite/sql2rel/RelStructuredTypeFlattener.java
@@ -25,13 +25,13 @@ import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelVisitor;
 import org.apache.calcite.rel.core.Collect;
-import org.apache.calcite.rel.core.Correlation;
-import org.apache.calcite.rel.core.Correlator;
+import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.Sample;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.core.Uncollect;
 import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalCalc;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalIntersect;
 import org.apache.calcite.rel.logical.LogicalJoin;
@@ -61,6 +61,7 @@ import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.ReflectUtil;
 import org.apache.calcite.util.ReflectiveVisitDispatcher;
 import org.apache.calcite.util.ReflectiveVisitor;
@@ -143,10 +144,10 @@ public class RelStructuredTypeFlattener implements ReflectiveVisitor {
   //~ Methods ----------------------------------------------------------------
 
   public void updateRelInMap(
-      SortedSetMultimap<RelNode, Correlation> mapRefRelToCorVar) {
+      SortedSetMultimap<RelNode, CorrelationId> mapRefRelToCorVar) {
     for (RelNode rel : Lists.newArrayList(mapRefRelToCorVar.keySet())) {
       if (oldToNewRelMap.containsKey(rel)) {
-        SortedSet<Correlation> corVarSet =
+        SortedSet<CorrelationId> corVarSet =
             mapRefRelToCorVar.removeAll(rel);
         mapRefRelToCorVar.putAll(oldToNewRelMap.get(rel), corVarSet);
       }
@@ -154,13 +155,13 @@ public class RelStructuredTypeFlattener implements ReflectiveVisitor {
   }
 
   public void updateRelInMap(
-      SortedMap<Correlation, Correlator> mapCorVarToCorRel) {
-    for (Correlation corVar : mapCorVarToCorRel.keySet()) {
-      Correlator oldRel = mapCorVarToCorRel.get(corVar);
+      SortedMap<CorrelationId, LogicalCorrelate> mapCorVarToCorRel) {
+    for (CorrelationId corVar : mapCorVarToCorRel.keySet()) {
+      LogicalCorrelate oldRel = mapCorVarToCorRel.get(corVar);
       if (oldToNewRelMap.containsKey(oldRel)) {
         RelNode newRel = oldToNewRelMap.get(oldRel);
-        assert newRel instanceof Correlator;
-        mapCorVarToCorRel.put(corVar, (Correlator) newRel);
+        assert newRel instanceof LogicalCorrelate;
+        mapCorVarToCorRel.put(corVar, (LogicalCorrelate) newRel);
       }
     }
   }
@@ -417,28 +418,24 @@ public class RelStructuredTypeFlattener implements ReflectiveVisitor {
     setNewForOldRel(rel, newRel);
   }
 
-  public void rewriteRel(Correlator rel) {
-    final List<Correlation> newCorrelations =
-        new ArrayList<Correlation>();
-    for (Correlation c : rel.getCorrelations()) {
+  public void rewriteRel(LogicalCorrelate rel) {
+    ImmutableBitSet.Builder newPos = ImmutableBitSet.builder();
+    for (Integer pos : rel.getRequiredColumns()) {
       RelDataType corrFieldType =
-          rel.getLeft().getRowType().getFieldList().get(c.getOffset())
+          rel.getLeft().getRowType().getFieldList().get(pos)
               .getType();
       if (corrFieldType.isStruct()) {
         throw Util.needToImplement("correlation on structured type");
       }
-      newCorrelations.add(
-          new Correlation(
-              c.getId(),
-              getNewForOldInput(c.getOffset())));
+      newPos.set(getNewForOldInput(pos));
     }
-    Correlator newRel =
-        new Correlator(
+    LogicalCorrelate newRel =
+        new LogicalCorrelate(
             rel.getCluster(),
             getNewForOldRel(rel.getLeft()),
             getNewForOldRel(rel.getRight()),
-            rel.getCondition(),
-            newCorrelations,
+            rel.getCorrelationId(),
+            newPos.build(),
             rel.getJoinType());
     setNewForOldRel(rel, newRel);
   }
@@ -700,7 +697,6 @@ public class RelStructuredTypeFlattener implements ReflectiveVisitor {
   public void rewriteRel(LogicalTableScan rel) {
     RelNode newRel =
         rel.getTable().toRel(toRelContext);
-
     setNewForOldRel(rel, newRel);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java b/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
index 2d0cf6c..09b074d 100644
--- a/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
+++ b/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
@@ -30,11 +30,11 @@ import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelCollationImpl;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttle;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.core.Collect;
-import org.apache.calcite.rel.core.Correlation;
-import org.apache.calcite.rel.core.Correlator;
+import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinInfo;
 import org.apache.calcite.rel.core.JoinRelType;
@@ -43,6 +43,7 @@ import org.apache.calcite.rel.core.Sample;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.core.Uncollect;
 import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
 import org.apache.calcite.rel.logical.LogicalIntersect;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalMinus;
@@ -75,6 +76,7 @@ import org.apache.calcite.schema.ModifiableTable;
 import org.apache.calcite.schema.TranslatableTable;
 import org.apache.calcite.sql.JoinConditionType;
 import org.apache.calcite.sql.JoinType;
+import org.apache.calcite.sql.SemiJoinType;
 import org.apache.calcite.sql.SqlAggFunction;
 import org.apache.calcite.sql.SqlBasicCall;
 import org.apache.calcite.sql.SqlCall;
@@ -2049,7 +2051,14 @@ public class SqlToRelConverter {
 
     Set<String> correlatedVariables = RelOptUtil.getVariablesUsed(rightRel);
     if (correlatedVariables.size() > 0) {
-      final List<Correlation> correlations = Lists.newArrayList();
+      final ImmutableBitSet.Builder requiredColumns = ImmutableBitSet.builder();
+      final List<String> correlNames = Lists.newArrayList();
+
+      // All correlations must refer the same namespace since correlation
+      // produces exactly one correlation source.
+      // The same source might be referenced by different variables since
+      // DeferredLookups are not de-duplicated at create time.
+      SqlValidatorNamespace prevNs = null;
 
       for (String correlName : correlatedVariables) {
         DeferredLookup lookup = mapCorrelToDeferred.get(correlName);
@@ -2073,66 +2082,86 @@ public class SqlToRelConverter {
         SqlValidatorScope ancestorScope = ancestorScopes[0];
         boolean correlInCurrentScope = ancestorScope == bb.scope;
 
-        if (correlInCurrentScope) {
-          int namespaceOffset = 0;
-          if (childNamespaceIndex > 0) {
-            // If not the first child, need to figure out the width
-            // of output types from all the preceding namespaces
-            assert ancestorScope instanceof ListScope;
-            List<SqlValidatorNamespace> children =
-                ((ListScope) ancestorScope).getChildren();
-
-            for (int i = 0; i < childNamespaceIndex; i++) {
-              SqlValidatorNamespace child = children.get(i);
-              namespaceOffset +=
-                  child.getRowType().getFieldCount();
-            }
-          }
+        if (!correlInCurrentScope) {
+          continue;
+        }
 
-          RelDataTypeField field =
-              catalogReader.field(foundNs.getRowType(), originalFieldName);
-          int pos = namespaceOffset + field.getIndex();
-
-          assert field.getType()
-              == lookup.getFieldAccess(correlName).getField().getType();
-
-          assert pos != -1;
-
-          if (bb.mapRootRelToFieldProjection.containsKey(bb.root)) {
-            // bb.root is an aggregate and only projects group by
-            // keys.
-            Map<Integer, Integer> exprProjection =
-                bb.mapRootRelToFieldProjection.get(bb.root);
-
-            // subquery can reference group by keys projected from
-            // the root of the outer relation.
-            if (exprProjection.containsKey(pos)) {
-              pos = exprProjection.get(pos);
-            } else {
-              // correl not grouped
-              throw Util.newInternal(
-                  "Identifier '" + originalRelName + "."
-                  + originalFieldName + "' is not a group expr");
-            }
+        if (prevNs == null) {
+          prevNs = foundNs;
+        } else {
+          assert prevNs == foundNs : "All correlation variables should resolve"
+              + " to the same namespace."
+              + " Prev ns=" + prevNs
+              + ", new ns=" + foundNs;
+        }
+
+        int namespaceOffset = 0;
+        if (childNamespaceIndex > 0) {
+          // If not the first child, need to figure out the width
+          // of output types from all the preceding namespaces
+          assert ancestorScope instanceof ListScope;
+          List<SqlValidatorNamespace> children =
+              ((ListScope) ancestorScope).getChildren();
+
+          for (int i = 0; i < childNamespaceIndex; i++) {
+            SqlValidatorNamespace child = children.get(i);
+            namespaceOffset +=
+                child.getRowType().getFieldCount();
           }
+        }
+
+        RelDataTypeField field =
+            catalogReader.field(foundNs.getRowType(), originalFieldName);
+        int pos = namespaceOffset + field.getIndex();
+
+        assert field.getType()
+            == lookup.getFieldAccess(correlName).getField().getType();
 
-          Correlation newCorVar =
-              new Correlation(
-                  getCorrelOrdinal(correlName),
-                  pos);
+        assert pos != -1;
 
-          correlations.add(newCorVar);
+        if (bb.mapRootRelToFieldProjection.containsKey(bb.root)) {
+          // bb.root is an aggregate and only projects group by
+          // keys.
+          Map<Integer, Integer> exprProjection =
+              bb.mapRootRelToFieldProjection.get(bb.root);
+
+          // subquery can reference group by keys projected from
+          // the root of the outer relation.
+          if (exprProjection.containsKey(pos)) {
+            pos = exprProjection.get(pos);
+          } else {
+            // correl not grouped
+            throw Util.newInternal(
+                "Identifier '" + originalRelName + "."
+                + originalFieldName + "' is not a group expr");
+          }
         }
+
+        requiredColumns.set(pos);
+        correlNames.add(correlName);
       }
 
-      if (!correlations.isEmpty()) {
-        return new Correlator(
+      if (!correlNames.isEmpty()) {
+        if (correlNames.size() > 1) {
+          // The same table was referenced more than once.
+          // So we deduplicate
+          RelShuttle dedup =
+              new DeduplicateCorrelateVariables(rexBuilder,
+                  correlNames.get(0),
+                  ImmutableSet.copyOf(Util.skip(correlNames)));
+          rightRel = rightRel.accept(dedup);
+        }
+        LogicalCorrelate corr = new LogicalCorrelate(
             rightRel.getCluster(),
             leftRel,
             rightRel,
-            joinCond,
-            correlations,
-            joinType);
+            new CorrelationId(correlNames.get(0)),
+            requiredColumns.build(),
+            SemiJoinType.of(joinType));
+        if (!joinCond.isAlwaysTrue()) {
+          return RelOptUtil.createFilter(corr, joinCond);
+        }
+        return corr;
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/tools/Programs.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/tools/Programs.java b/core/src/main/java/org/apache/calcite/tools/Programs.java
index e9aa502..a16e83c 100644
--- a/core/src/main/java/org/apache/calcite/tools/Programs.java
+++ b/core/src/main/java/org/apache/calcite/tools/Programs.java
@@ -100,6 +100,7 @@ public class Programs {
       ImmutableSet.of(
           EnumerableRules.ENUMERABLE_JOIN_RULE,
           EnumerableRules.ENUMERABLE_SEMI_JOIN_RULE,
+          EnumerableRules.ENUMERABLE_CORRELATE_RULE,
           EnumerableRules.ENUMERABLE_PROJECT_RULE,
           EnumerableRules.ENUMERABLE_FILTER_RULE,
           EnumerableRules.ENUMERABLE_AGGREGATE_RULE,

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
index be15c2e..ad83a92 100644
--- a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
+++ b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
@@ -23,6 +23,7 @@ import org.apache.calcite.avatica.util.DateTimeUtils;
 import org.apache.calcite.avatica.util.TimeUnitRange;
 import org.apache.calcite.interpreter.Row;
 import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.CorrelateJoinType;
 import org.apache.calcite.linq4j.Enumerable;
 import org.apache.calcite.linq4j.EnumerableDefaults;
 import org.apache.calcite.linq4j.Enumerator;
@@ -123,6 +124,8 @@ public enum BuiltInMethod {
   SLICE0(Enumerables.class, "slice0", Enumerable.class),
   SEMI_JOIN(Enumerables.class, "semiJoin", Enumerable.class, Enumerable.class,
       Function1.class, Function1.class),
+  CORRELATE_JOIN(ExtendedEnumerable.class, "correlateJoin",
+      CorrelateJoinType.class, Function1.class, Function2.class),
   SELECT(ExtendedEnumerable.class, "select", Function1.class),
   SELECT2(ExtendedEnumerable.class, "select", Function2.class),
   SELECT_MANY(ExtendedEnumerable.class, "selectMany", Function1.class),

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/util/trace/CalciteTrace.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/util/trace/CalciteTrace.java b/core/src/main/java/org/apache/calcite/util/trace/CalciteTrace.java
index 2f69f21..eb04640 100644
--- a/core/src/main/java/org/apache/calcite/util/trace/CalciteTrace.java
+++ b/core/src/main/java/org/apache/calcite/util/trace/CalciteTrace.java
@@ -18,9 +18,9 @@ package org.apache.calcite.util.trace;
 
 import org.apache.calcite.linq4j.function.Function2;
 import org.apache.calcite.linq4j.function.Functions;
+import org.apache.calcite.plan.RelImplementor;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.prepare.Prepare;
-import org.apache.calcite.rel.RelImplementorImpl;
 
 import java.io.File;
 import java.util.logging.Level;
@@ -96,7 +96,7 @@ public abstract class CalciteTrace {
    * expressions are bound to variables ({@link Level#FINE})
    */
   public static Logger getRelImplementorTracer() {
-    return Logger.getLogger(RelImplementorImpl.class.getName());
+    return Logger.getLogger(RelImplementor.class.getName());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/test/java/org/apache/calcite/test/CalciteSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/CalciteSuite.java b/core/src/test/java/org/apache/calcite/test/CalciteSuite.java
index f3ad06a..aeeb360 100644
--- a/core/src/test/java/org/apache/calcite/test/CalciteSuite.java
+++ b/core/src/test/java/org/apache/calcite/test/CalciteSuite.java
@@ -30,6 +30,7 @@ import org.apache.calcite.sql.test.SqlAdvisorTest;
 import org.apache.calcite.sql.test.SqlOperatorTest;
 import org.apache.calcite.sql.test.SqlPrettyWriterTest;
 import org.apache.calcite.sql.test.SqlTypeNameTest;
+import org.apache.calcite.test.enumerable.EnumerableCorrelateTest;
 import org.apache.calcite.tools.FrameworksTest;
 import org.apache.calcite.tools.PlannerTest;
 import org.apache.calcite.util.BitSetsTest;
@@ -99,6 +100,7 @@ import org.junit.runners.Suite;
     SqlOperatorTest.class,
     ChunkListTest.class,
     FrameworksTest.class,
+    EnumerableCorrelateTest.class,
 
     // slow tests (above 1s)
     PlannerTest.class,

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/test/java/org/apache/calcite/test/JdbcTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/JdbcTest.java b/core/src/test/java/org/apache/calcite/test/JdbcTest.java
index 426aa78..9b7d947 100644
--- a/core/src/test/java/org/apache/calcite/test/JdbcTest.java
+++ b/core/src/test/java/org/apache/calcite/test/JdbcTest.java
@@ -4176,7 +4176,7 @@ public class JdbcTest {
             new Function<RelNode, Void>() {
               public Void apply(RelNode relNode) {
                 String s = RelOptUtil.toString(relNode);
-                assertThat(s, not(containsString("Correlator")));
+                assertThat(s, not(containsString("Correlate")));
                 return null;
               }
             });

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java b/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
index 76e18ce..62b754a 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
@@ -838,6 +838,24 @@ public class SqlToRelConverterTest extends SqlToRelTestBase {
         "${plan}");
   }
 
+  @Test public void testNestedCorrelations() {
+    tester.withDecorrelation(false).assertConvertsTo(
+        "select * from (select 2+deptno d2, 3+deptno d3 from emp) e\n"
+            + " where exists (select 1 from (select deptno+1 d1 from dept) d\n"
+            + " where d1=e.d2 and exists (select 2 from (select deptno+4 d4, deptno+5 d5, deptno+6 d6 from dept)\n"
+            + " where d4=d.d1 and d5=d.d1 and d6=e.d3))",
+        "${plan}");
+  }
+
+  @Test public void testNestedCorrelationsDecorrelated() {
+    tester.withDecorrelation(true).assertConvertsTo(
+        "select * from (select 2+deptno d2, 3+deptno d3 from emp) e\n"
+            + " where exists (select 1 from (select deptno+1 d1 from dept) d\n"
+            + " where d1=e.d2 and exists (select 2 from (select deptno+4 d4, deptno+5 d5, deptno+6 d6 from dept)\n"
+            + " where d4=d.d1 and d5=d.d1 and d6=e.d3))",
+        "${plan}");
+  }
+
   @Test public void testElement() {
     check("select element(multiset[5]) from emp", "${plan}");
   }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/test/java/org/apache/calcite/test/enumerable/EnumerableCorrelateTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/enumerable/EnumerableCorrelateTest.java b/core/src/test/java/org/apache/calcite/test/enumerable/EnumerableCorrelateTest.java
new file mode 100644
index 0000000..a725e52
--- /dev/null
+++ b/core/src/test/java/org/apache/calcite/test/enumerable/EnumerableCorrelateTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.test.enumerable;
+
+import org.apache.calcite.adapter.java.ReflectiveSchema;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.test.CalciteAssert;
+import org.apache.calcite.test.JdbcTest;
+import org.apache.calcite.util.Bug;
+
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.Properties;
+
+/**
+ * Tests {@link org.apache.calcite.adapter.enumerable.EnumerableCorrelate}
+ */
+public class EnumerableCorrelateTest {
+  @Test public void simpleCorrelateDecorrelated() {
+    tester(true, new JdbcTest.HrSchema())
+        .query(
+            "select empid, name from emps e where exists (select 1 from depts d where d.deptno=e.deptno)")
+        .explainContains(
+            "EnumerableCalc(expr#0..4=[{inputs}], empid=[$t0], name=[$t2])\n"
+            + "  EnumerableSemiJoin(condition=[=($1, $5)], joinType=[inner])\n"
+            + "    EnumerableTableScan(table=[[s, emps]])\n"
+            + "    EnumerableCalc(expr#0..3=[{inputs}], expr#4=[true], $f01=[$t0], $f0=[$t4])\n"
+            + "      EnumerableJoin(condition=[=($0, $1)], joinType=[inner])\n"
+            + "        EnumerableAggregate(group=[{0}])\n"
+            + "          EnumerableCalc(expr#0..4=[{inputs}], $f0=[$t1])\n"
+            + "            EnumerableTableScan(table=[[s, emps]])\n"
+            + "        EnumerableTableScan(table=[[s, depts]])")
+        .returnsUnordered(
+            "empid=100; name=Bill",
+            "empid=110; name=Theodore",
+            "empid=150; name=Sebastian");
+
+  }
+
+  @Test public void simpleCorrelate() {
+    tester(false, new JdbcTest.HrSchema())
+        .query(
+            "select empid, name from emps e where exists (select 1 from depts d where d.deptno=e.deptno)")
+        .explainContains(
+            "EnumerableCalc(expr#0..5=[{inputs}], expr#6=[IS NOT NULL($t5)], empid=[$t0], name=[$t2], $condition=[$t6])\n"
+            + "  EnumerableCorrelate(correlation=[$cor0], joinType=[LEFT], requiredColumns=[{1}])\n"
+            + "    EnumerableTableScan(table=[[s, emps]])\n"
+            + "    EnumerableAggregate(group=[{}], agg#0=[MIN($0)])\n"
+            + "      EnumerableCalc(expr#0..2=[{inputs}], expr#3=[true], expr#4=[$cor0], expr#5=[$t4.deptno], expr#6=[=($t0, $t5)], $f0=[$t3], $condition=[$t6])\n"
+            + "        EnumerableTableScan(table=[[s, depts]])")
+        .returnsUnordered(
+            "empid=100; name=Bill",
+            "empid=110; name=Theodore",
+            "empid=150; name=Sebastian");
+
+  }
+
+  private CalciteAssert.AssertThat tester(final boolean forceDecorrelate,
+      final Object schema) {
+    Bug.remark(
+        "CALCITE-489 - Teach CalciteAssert to respect multiple settings");
+    final Properties p = new Properties();
+    p.setProperty("lex", "JAVA");
+    p.setProperty("forceDecorrelate", Boolean.toString(forceDecorrelate));
+    return CalciteAssert.that()
+        .with(new CalciteAssert.ConnectionFactory() {
+          public CalciteConnection createConnection() throws Exception {
+            Connection connection =
+                DriverManager.getConnection("jdbc:calcite:", p);
+            CalciteConnection calciteConnection =
+                connection.unwrap(CalciteConnection.class);
+            SchemaPlus rootSchema =
+                calciteConnection.getRootSchema();
+            rootSchema.add("s", new ReflectiveSchema(schema));
+            calciteConnection.setSchema("s");
+            return calciteConnection;
+          }
+        });
+  }
+}
+
+// End EnumerableCorrelateTest.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/test/java/org/apache/calcite/test/enumerable/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/enumerable/package-info.java b/core/src/test/java/org/apache/calcite/test/enumerable/package-info.java
new file mode 100644
index 0000000..0b3113d
--- /dev/null
+++ b/core/src/test/java/org/apache/calcite/test/enumerable/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Tests for Enumerable convention runtime.
+ */
+package org.apache.calcite.test.enumerable;
+
+// End package-info.java