You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by vl...@apache.org on 2014/12/12 08:27:03 UTC

[2/3] incubator-calcite git commit: [CALCITE-483][CALCITE-489] Update Correlate mechanics and implement EnumerableCorrelate (aka nested loops join)

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/rel/logical/LogicalCorrelate.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/logical/LogicalCorrelate.java b/core/src/main/java/org/apache/calcite/rel/logical/LogicalCorrelate.java
new file mode 100644
index 0000000..e5d3bd7
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/logical/LogicalCorrelate.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.logical;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttle;
+import org.apache.calcite.rel.core.Correlate;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.sql.SemiJoinType;
+import org.apache.calcite.util.ImmutableBitSet;
+
+/**
+ * A relational operator that performs nested-loop joins.
+ *
+ * <p>It behaves like a kind of {@link org.apache.calcite.rel.core.Join},
+ * but works by setting variables in its environment and restarting its
+ * right-hand input.
+ *
+ * <p>A LogicalCorrelate is used to represent a correlated query. One
+ * implementation strategy is to de-correlate the expression.
+ *
+ * @see org.apache.calcite.rel.core.CorrelationId
+ */
+public final class LogicalCorrelate extends Correlate {
+  //~ Instance fields --------------------------------------------------------
+
+  //~ Constructors -----------------------------------------------------------
+
+  /**
+   * Creates a LogicalCorrelate.
+   * @param cluster      cluster this relational expression belongs to
+   * @param left         left input relational expression
+   * @param right        right input relational expression
+   * @param correlationId variable name for the row of left input
+   * @param requiredColumns
+   * @param joinType     join type
+   */
+  public LogicalCorrelate(
+      RelOptCluster cluster,
+      RelNode left,
+      RelNode right,
+      CorrelationId correlationId,
+      ImmutableBitSet requiredColumns, SemiJoinType joinType) {
+    super(
+        cluster,
+        cluster.traitSetOf(Convention.NONE),
+        left,
+        right,
+        correlationId,
+        requiredColumns,
+        joinType);
+  }
+
+  /**
+   * Creates a LogicalCorrelate by parsing serialized output.
+   */
+  public LogicalCorrelate(RelInput input) {
+    this(
+        input.getCluster(), input.getInputs().get(0),
+        input.getInputs().get(1),
+        new CorrelationId((Integer) input.get("correlationId")),
+        input.getBitSet("requiredColumns"),
+        input.getEnum("joinType", SemiJoinType.class));
+  }
+
+  //~ Methods ----------------------------------------------------------------
+
+  @Override public LogicalCorrelate copy(RelTraitSet traitSet,
+      RelNode left, RelNode right, CorrelationId correlationId,
+      ImmutableBitSet requiredColumns, SemiJoinType joinType) {
+    assert traitSet.containsIfApplicable(Convention.NONE);
+    return new LogicalCorrelate(
+        getCluster(),
+        left,
+        right,
+        correlationId,
+        requiredColumns,
+        joinType);
+  }
+
+  @Override public RelNode accept(RelShuttle shuttle) {
+    return shuttle.visit(this);
+  }
+}
+
+// End LogicalCorrelate.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/rel/metadata/RelMdColumnUniqueness.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdColumnUniqueness.java b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdColumnUniqueness.java
index 6a6fab6..5128884 100644
--- a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdColumnUniqueness.java
+++ b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdColumnUniqueness.java
@@ -18,7 +18,7 @@ package org.apache.calcite.rel.metadata;
 
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
-import org.apache.calcite.rel.core.Correlator;
+import org.apache.calcite.rel.core.Correlate;
 import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinInfo;
@@ -72,7 +72,7 @@ public class RelMdColumnUniqueness {
   }
 
   public Boolean areColumnsUnique(
-      Correlator rel,
+      Correlate rel,
       ImmutableBitSet columns,
       boolean ignoreNulls) {
     return RelMetadataQuery.areColumnsUnique(

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/rel/metadata/RelMdUniqueKeys.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdUniqueKeys.java b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdUniqueKeys.java
index a84a94e..d6a0275 100644
--- a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdUniqueKeys.java
+++ b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdUniqueKeys.java
@@ -18,7 +18,7 @@ package org.apache.calcite.rel.metadata;
 
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Aggregate;
-import org.apache.calcite.rel.core.Correlator;
+import org.apache.calcite.rel.core.Correlate;
 import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinInfo;
@@ -62,7 +62,7 @@ public class RelMdUniqueKeys {
     return RelMetadataQuery.getUniqueKeys(rel.getInput(), ignoreNulls);
   }
 
-  public Set<ImmutableBitSet> getUniqueKeys(Correlator rel,
+  public Set<ImmutableBitSet> getUniqueKeys(Correlate rel,
       boolean ignoreNulls) {
     return RelMetadataQuery.getUniqueKeys(rel.getLeft(), ignoreNulls);
   }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/rel/rules/JoinToCorrelateRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/JoinToCorrelateRule.java b/core/src/main/java/org/apache/calcite/rel/rules/JoinToCorrelateRule.java
new file mode 100644
index 0000000..bcdd0aa
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/rules/JoinToCorrelateRule.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.SemiJoinType;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Util;
+
+/**
+ * Rule that converts a {@link org.apache.calcite.rel.logical.LogicalJoin}
+ * into a {@link org.apache.calcite.rel.logical.LogicalCorrelate}, which can
+ * then be implemented using nested loops.
+ *
+ * <p>For example,</p>
+ *
+ * <blockquote><code>select * from emp join dept on emp.deptno =
+ * dept.deptno</code></blockquote>
+ *
+ * <p>becomes a Correlator which restarts LogicalTableScan("DEPT") for each
+ * row read from LogicalTableScan("EMP").</p>
+ *
+ * <p>This rule is not applicable if for certain types of outer join. For
+ * example,</p>
+ *
+ * <blockquote><code>select * from emp right join dept on emp.deptno =
+ * dept.deptno</code></blockquote>
+ *
+ * <p>would require emitting a NULL emp row if a certain department contained no
+ * employees, and Correlator cannot do that.</p>
+ */
+public class JoinToCorrelateRule extends RelOptRule {
+  //~ Static fields/initializers ---------------------------------------------
+
+  public static final JoinToCorrelateRule INSTANCE =
+      new JoinToCorrelateRule(RelFactories.DEFAULT_FILTER_FACTORY);
+
+  protected final RelFactories.FilterFactory filterFactory;
+
+  //~ Constructors -----------------------------------------------------------
+
+  /**
+   * Private constructor; use singleton {@link #INSTANCE}.
+   */
+  protected JoinToCorrelateRule(RelFactories.FilterFactory filterFactory) {
+    super(operand(LogicalJoin.class, any()));
+    this.filterFactory = filterFactory;
+    assert filterFactory != null : "Filter factory should not be null";
+  }
+
+  //~ Methods ----------------------------------------------------------------
+
+  public boolean matches(RelOptRuleCall call) {
+    LogicalJoin join = call.rel(0);
+    switch (join.getJoinType()) {
+    case INNER:
+    case LEFT:
+      return true;
+    case FULL:
+    case RIGHT:
+      return false;
+    default:
+      throw Util.unexpected(join.getJoinType());
+    }
+  }
+
+  public void onMatch(RelOptRuleCall call) {
+    assert matches(call);
+    final LogicalJoin join = call.rel(0);
+    RelNode right = join.getRight();
+    final RelNode left = join.getLeft();
+    final int leftFieldCount = left.getRowType().getFieldCount();
+    final RelOptCluster cluster = join.getCluster();
+    final RexBuilder rexBuilder = cluster.getRexBuilder();
+    final String dynInIdStr = cluster.getQuery().createCorrel();
+    final CorrelationId correlationId = new CorrelationId(dynInIdStr);
+    final RexNode corrVar =
+        rexBuilder.makeCorrel(left.getRowType(), correlationId.getName());
+    final ImmutableBitSet.Builder requiredColumns = ImmutableBitSet.builder();
+    RexNode joinCondition = join.getCondition();
+
+    // Replace all references of left input with FieldAccess(corrVar, field)
+    joinCondition = joinCondition.accept(new RexShuttle() {
+      @Override
+      public RexNode visitInputRef(RexInputRef input) {
+        int field = input.getIndex();
+        if (field >= leftFieldCount) {
+          return rexBuilder.makeInputRef(input.getType(),
+              input.getIndex() - leftFieldCount);
+        }
+        requiredColumns.set(field);
+        return rexBuilder.makeFieldAccess(corrVar, field);
+      }
+    });
+
+    joinCondition = RexUtil.flatten(rexBuilder, joinCondition);
+    final RelNode filteredRight =
+        RelOptUtil.createFilter(right, joinCondition, filterFactory);
+    RelNode newRel =
+        new LogicalCorrelate(
+            join.getCluster(),
+            left,
+            filteredRight,
+            correlationId,
+            requiredColumns.build(),
+            SemiJoinType.of(join.getJoinType()));
+    call.transformTo(newRel);
+  }
+}
+
+// End JoinToCorrelateRule.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/rel/rules/JoinToCorrelatorRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/JoinToCorrelatorRule.java b/core/src/main/java/org/apache/calcite/rel/rules/JoinToCorrelatorRule.java
deleted file mode 100644
index 219b662..0000000
--- a/core/src/main/java/org/apache/calcite/rel/rules/JoinToCorrelatorRule.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.rel.rules;
-
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptQuery;
-import org.apache.calcite.plan.RelOptRule;
-import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Correlation;
-import org.apache.calcite.rel.core.Correlator;
-import org.apache.calcite.rel.core.JoinInfo;
-import org.apache.calcite.rel.logical.LogicalJoin;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.util.Util;
-import org.apache.calcite.util.mapping.IntPair;
-
-import com.google.common.collect.Lists;
-
-import java.util.List;
-
-/**
- * Rule that converts a {@link org.apache.calcite.rel.logical.LogicalJoin}
- * into a {@link org.apache.calcite.rel.core.Correlator}, which can
- * then be implemented using nested loops.
- *
- * <p>For example,</p>
- *
- * <blockquote><code>select * from emp join dept on emp.deptno =
- * dept.deptno</code></blockquote>
- *
- * <p>becomes a Correlator which restarts LogicalTableScan("DEPT") for each
- * row read from LogicalTableScan("EMP").</p>
- *
- * <p>This rule is not applicable if for certain types of outer join. For
- * example,</p>
- *
- * <blockquote><code>select * from emp right join dept on emp.deptno =
- * dept.deptno</code></blockquote>
- *
- * <p>would require emitting a NULL emp row if a certain department contained no
- * employees, and Correlator cannot do that.</p>
- */
-public class JoinToCorrelatorRule extends RelOptRule {
-  //~ Static fields/initializers ---------------------------------------------
-
-  public static final JoinToCorrelatorRule INSTANCE =
-      new JoinToCorrelatorRule();
-
-  //~ Constructors -----------------------------------------------------------
-
-  /**
-   * Private constructor; use singleton {@link #INSTANCE}.
-   */
-  private JoinToCorrelatorRule() {
-    super(operand(LogicalJoin.class, any()));
-  }
-
-  //~ Methods ----------------------------------------------------------------
-
-  public boolean matches(RelOptRuleCall call) {
-    LogicalJoin join = call.rel(0);
-    switch (join.getJoinType()) {
-    case INNER:
-    case LEFT:
-      return true;
-    case FULL:
-    case RIGHT:
-      return false;
-    default:
-      throw Util.unexpected(join.getJoinType());
-    }
-  }
-
-  public void onMatch(RelOptRuleCall call) {
-    assert matches(call);
-    final LogicalJoin join = call.rel(0);
-    RelNode right = join.getRight();
-    final RelNode left = join.getLeft();
-    final JoinInfo joinInfo = join.analyzeCondition();
-    final List<Correlation> correlationList = Lists.newArrayList();
-    final RelOptCluster cluster = join.getCluster();
-    final RexBuilder rexBuilder = cluster.getRexBuilder();
-    final List<RexNode> conditions = Lists.newArrayList();
-    for (IntPair p : joinInfo.pairs()) {
-      final String dynInIdStr = cluster.getQuery().createCorrel();
-      final int dynInId = RelOptQuery.getCorrelOrdinal(dynInIdStr);
-
-      // Create correlation to say 'each row, set variable #id
-      // to the value of column #leftKey'.
-      correlationList.add(new Correlation(dynInId, p.source));
-      conditions.add(
-          rexBuilder.makeCall(SqlStdOperatorTable.EQUALS,
-              rexBuilder.makeInputRef(right, p.target),
-              rexBuilder.makeCorrel(
-                  left.getRowType().getFieldList().get(p.source).getType(),
-                  dynInIdStr)));
-    }
-    final RelNode filteredRight = RelOptUtil.createFilter(right, conditions);
-    RelNode newRel =
-        new Correlator(
-            join.getCluster(),
-            left,
-            filteredRight,
-            joinInfo.getRemaining(join.getCluster().getRexBuilder()),
-            correlationList,
-            join.getJoinType());
-    call.transformTo(newRel);
-  }
-}
-
-// End JoinToCorrelatorRule.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/rel/rules/ProjectJoinTransposeRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/ProjectJoinTransposeRule.java b/core/src/main/java/org/apache/calcite/rel/rules/ProjectJoinTransposeRule.java
index 5379110..6c275a2 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/ProjectJoinTransposeRule.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/ProjectJoinTransposeRule.java
@@ -20,6 +20,7 @@ import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.SemiJoin;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexNode;
@@ -67,6 +68,9 @@ public class ProjectJoinTransposeRule extends RelOptRule {
     LogicalProject origProj = call.rel(0);
     final Join join = call.rel(1);
 
+    if (join instanceof SemiJoin) {
+      return; // TODO: support SemiJoin
+    }
     // locate all fields referenced in the projection and join condition;
     // determine which inputs are referenced in the projection and
     // join condition; if all fields are being referenced and there are no

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/PushProjector.java b/core/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
index cfaf740..867cf0e 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/PushProjector.java
@@ -20,6 +20,7 @@ import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.SemiJoin;
 import org.apache.calcite.rel.core.SetOp;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.type.RelDataTypeField;
@@ -213,7 +214,7 @@ public class PushProjector {
       List<RelDataTypeField> rightFields =
           joinRel.getRight().getRowType().getFieldList();
       nFields = leftFields.size();
-      nFieldsRight = rightFields.size();
+      nFieldsRight = childRel instanceof SemiJoin ? 0 : rightFields.size();
       nSysFields = joinRel.getSystemFieldList().size();
       childBitmap =
           ImmutableBitSet.range(nSysFields, nFields + nSysFields);

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/rex/RexExecutorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rex/RexExecutorImpl.java b/core/src/main/java/org/apache/calcite/rex/RexExecutorImpl.java
index c795c23..9825367 100644
--- a/core/src/main/java/org/apache/calcite/rex/RexExecutorImpl.java
+++ b/core/src/main/java/org/apache/calcite/rex/RexExecutorImpl.java
@@ -77,7 +77,7 @@ public class RexExecutorImpl implements RelOptPlanner.Executor {
             Expressions.convert_(root0_, DataContext.class)));
     final List<Expression> expressions =
         RexToLixTranslator.translateProjects(programBuilder.getProgram(),
-        javaTypeFactory, blockBuilder, null, getter);
+        javaTypeFactory, blockBuilder, null, getter, null);
     blockBuilder.add(
         Expressions.return_(null,
             Expressions.newArrayInit(Object[].class, expressions)));

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/sql/SemiJoinType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/SemiJoinType.java b/core/src/main/java/org/apache/calcite/sql/SemiJoinType.java
new file mode 100644
index 0000000..50b3659
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/sql/SemiJoinType.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.sql;
+
+import org.apache.calcite.linq4j.CorrelateJoinType;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+/**
+ * Enumeration representing different join types used in correlation
+ * relations.
+ */
+public enum SemiJoinType implements SqlLiteral.SqlSymbol {
+  /**
+   * Inner join
+   */
+  INNER,
+
+  /**
+   * Left-outer join
+   */
+  LEFT,
+
+  /**
+   * Semi-join
+   * <p>Similar to from A ... where a in (select b from B ...)</p>
+   */
+  SEMI,
+
+  /**
+   * Anti-join
+   * <p>Similar to from A ... where a NOT in (select b from B ...)</p>
+   * <p>Note: if B.b is nullable and B has nulls, no rows must be returned</p>
+   */
+  ANTI;
+
+  /**
+   * Creates a parse-tree node representing an occurrence of this
+   * condition type keyword at a particular position in the parsed
+   * text.
+   */
+  public SqlLiteral symbol(SqlParserPos pos) {
+    return SqlLiteral.createSymbol(this, pos);
+  }
+
+  public static SemiJoinType of(JoinRelType joinType) {
+    switch (joinType) {
+    case INNER:
+      return INNER;
+    case LEFT:
+      return LEFT;
+    }
+    throw new IllegalArgumentException(
+        "Unsupported join type for semi-join " + joinType);
+  }
+
+  public JoinRelType toJoinType() {
+    switch (this) {
+    case INNER:
+      return JoinRelType.INNER;
+    case LEFT:
+      return JoinRelType.LEFT;
+    }
+    throw new IllegalStateException(
+        "Unable to convert " + this + " to JoinRelType");
+  }
+
+  public CorrelateJoinType toLinq4j() {
+    switch (this) {
+    case INNER:
+      return CorrelateJoinType.INNER;
+    case LEFT:
+      return CorrelateJoinType.LEFT;
+    case SEMI:
+      return CorrelateJoinType.SEMI;
+    case ANTI:
+      return CorrelateJoinType.ANTI;
+    }
+    throw new IllegalStateException(
+        "Unable to convert " + this + " to JoinRelType");
+  }
+
+  public boolean returnsJustFirstInput() {
+    switch (this) {
+    case INNER:
+    case LEFT:
+      return false;
+    case SEMI:
+    case ANTI:
+      return true;
+    }
+    throw new IllegalStateException(
+        "Unable to convert " + this + " to JoinRelType");
+  }
+}
+
+// End SemiJoinType.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/sql2rel/DeduplicateCorrelateVariables.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql2rel/DeduplicateCorrelateVariables.java b/core/src/main/java/org/apache/calcite/sql2rel/DeduplicateCorrelateVariables.java
new file mode 100644
index 0000000..206ed2b
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/sql2rel/DeduplicateCorrelateVariables.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.sql2rel;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCorrelVariable;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Rewrites relations to ensure the same correlation is referenced by the same
+ * correlation variable.
+ */
+public class DeduplicateCorrelateVariables extends RelShuttleImpl {
+  private final RexShuttle dedupRex;
+
+  /**
+   * Replaces alternative names of correlation variable to its canonical name.
+   */
+  private static class DeduplicateCorrelateVariablesShuttle extends RexShuttle {
+    private final RexBuilder builder;
+    private final String canonical;
+    private final Set<String> altNames;
+
+    public DeduplicateCorrelateVariablesShuttle(RexBuilder builder,
+        String canonical, Set<String> altNames) {
+      this.builder = builder;
+      this.canonical = canonical;
+      this.altNames = altNames;
+    }
+
+    @Override
+    public RexNode visitCorrelVariable(RexCorrelVariable variable) {
+      if (!altNames.contains(variable.getName())) {
+        return variable;
+      }
+
+      return builder.makeCorrel(variable.getType(), canonical);
+    }
+  }
+
+  public DeduplicateCorrelateVariables(RexBuilder builder,
+      String canonical, Set<String> altNames) {
+    dedupRex = new DeduplicateCorrelateVariablesShuttle(builder,
+        canonical, altNames);
+  }
+
+  @Override
+  public RelNode visit(LogicalFilter filter) {
+    LogicalFilter newFilter = (LogicalFilter) super.visit(filter);
+    RexNode condition = filter.getCondition();
+    RexNode newCondition = condition.accept(dedupRex);
+    if (condition != newCondition) {
+      return newFilter.copy(newFilter.getTraitSet(), newFilter.getInput(),
+          newCondition);
+    }
+    return newFilter;
+  }
+
+  @Override
+  public RelNode visit(LogicalProject project) {
+    LogicalProject project2 = (LogicalProject) super.visit(project);
+    List<RexNode> childExps = project2.getChildExps();
+    List<RexNode> newExps = dedupRex.apply(childExps);
+    if (childExps != newExps) {
+      return project2.copy(project2.getTraitSet(), project2.getInput(),
+          newExps, project2.getRowType());
+    }
+    return project2;
+  }
+
+  @Override
+  public RelNode visit(LogicalJoin join) {
+    LogicalJoin join2 = (LogicalJoin) super.visit(join);
+    RexNode condition = join2.getCondition();
+    RexNode newCondition = condition.accept(dedupRex);
+    if (condition != newCondition) {
+      return join2.copy(join2.getTraitSet(), newCondition, join2.getLeft(),
+          join2.getRight(), join2.getJoinType(), join2.isSemiJoinDone());
+    }
+    return join2;
+  }
+}
+
+// End DeduplicateCorrelateVariables.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java b/core/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
index 5f2ef57..bff404f 100644
--- a/core/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
+++ b/core/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
@@ -28,18 +28,18 @@ import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.hep.HepPlanner;
 import org.apache.calcite.plan.hep.HepProgram;
 import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.rel.BiRel;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelShuttleImpl;
 import org.apache.calcite.rel.RelVisitor;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.core.Correlation;
-import org.apache.calcite.rel.core.Correlator;
-import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalProject;
@@ -149,7 +149,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
   private final Map<RelNode, Map<Integer, Integer>>
   mapNewRelToMapOldToNewOutputPos = Maps.newHashMap();
 
-  private final HashSet<Correlator> generatedCorRels = Sets.newHashSet();
+  private final HashSet<LogicalCorrelate> generatedCorRels = Sets.newHashSet();
 
   //~ Constructors -----------------------------------------------------------
 
@@ -173,7 +173,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
    * @param rootRel Root node of the query
    *
    * @return Equivalent query with all
-   * {@link org.apache.calcite.rel.core.Correlator} instances removed
+   * {@link org.apache.calcite.rel.logical.LogicalCorrelate} instances removed
    */
   public static RelNode decorrelateQuery(RelNode rootRel) {
     final CorelMap corelMap = CorelMap.build(rootRel);
@@ -206,7 +206,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
     return newRootRel;
   }
 
-  private void setCurrent(RelNode root, Correlator corRel) {
+  private void setCurrent(RelNode root, LogicalCorrelate corRel) {
     currentRel = corRel;
     if (corRel != null) {
       cm = CorelMap.build(Util.first(root, corRel));
@@ -249,17 +249,16 @@ public class RelDecorrelator implements ReflectiveVisitor {
           cm.mapRefRelToCorVar.putAll(newNode,
               cm.mapRefRelToCorVar.get(oldNode));
         }
-        if (oldNode instanceof Correlator
-            && newNode instanceof Correlator) {
-          Correlator oldCor = (Correlator) oldNode;
-          for (Correlation c : oldCor.getCorrelations()) {
-            if (cm.mapCorVarToCorRel.get(c) == oldNode) {
-              cm.mapCorVarToCorRel.put(c, (Correlator) newNode);
-            }
+        if (oldNode instanceof LogicalCorrelate
+            && newNode instanceof LogicalCorrelate) {
+          LogicalCorrelate oldCor = (LogicalCorrelate) oldNode;
+          CorrelationId c = oldCor.getCorrelationId();
+          if (cm.mapCorVarToCorRel.get(c) == oldNode) {
+            cm.mapCorVarToCorRel.put(c, (LogicalCorrelate) newNode);
           }
 
           if (generatedCorRels.contains(oldNode)) {
-            generatedCorRels.add((Correlator) newNode);
+            generatedCorRels.add((LogicalCorrelate) newNode);
           }
         }
         return null;
@@ -716,9 +715,9 @@ public class RelDecorrelator implements ReflectiveVisitor {
     // inputRel provides the definition of a correlated variable.
     // Add to map all the referenced positions(relative to each input rel)
     for (Correlation corVar : correlations) {
-      int oldCorVarOffset = corVar.getOffset();
+      int oldCorVarOffset = corVar.field;
 
-      oldInputRel = cm.mapCorVarToCorRel.get(corVar).getInput(0);
+      oldInputRel = cm.mapCorVarToCorRel.get(corVar.corr).getInput(0);
       assert oldInputRel != null;
       newInputRel = mapOldToNewRel.get(oldInputRel);
       assert newInputRel != null;
@@ -753,7 +752,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
     Set<RelNode> joinedInputRelSet = Sets.newHashSet();
 
     for (Correlation corVar : correlations) {
-      oldInputRel = cm.mapCorVarToCorRel.get(corVar).getInput(0);
+      oldInputRel = cm.mapCorVarToCorRel.get(corVar.corr).getInput(0);
       assert oldInputRel != null;
       newInputRel = mapOldToNewRel.get(oldInputRel);
       assert newInputRel != null;
@@ -795,17 +794,17 @@ public class RelDecorrelator implements ReflectiveVisitor {
       // The first child of a correlatorRel is always the rel defining
       // the correlated variables.
       newInputRel =
-          mapOldToNewRel.get(cm.mapCorVarToCorRel.get(corVar).getInput(0));
+          mapOldToNewRel.get(cm.mapCorVarToCorRel.get(corVar.corr).getInput(0));
       newLocalOutputPosList = mapNewInputRelToOutputPos.get(newInputRel);
 
       Map<Integer, Integer> mapOldToNewOutputPos =
           mapNewRelToMapOldToNewOutputPos.get(newInputRel);
       assert mapOldToNewOutputPos != null;
 
-      newLocalOutputPos = mapOldToNewOutputPos.get(corVar.getOffset());
+      newLocalOutputPos = mapOldToNewOutputPos.get(corVar.field);
 
       // newOutputPos is the index of the cor var in the referenced
-      // position list plus the offset of referenced postition list of
+      // position list plus the offset of referenced position list of
       // each newInputRel.
       newOutputPos =
           newLocalOutputPosList.indexOf(newLocalOutputPos)
@@ -948,7 +947,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
    *
    * @param rel Correlator
    */
-  public void decorrelateRel(Correlator rel) {
+  public void decorrelateRel(LogicalCorrelate rel) {
     //
     // Rewrite logic:
     //
@@ -989,13 +988,13 @@ public class RelDecorrelator implements ReflectiveVisitor {
     SortedMap<Correlation, Integer> mapCorVarToOutputPos =
         rightChildMapCorVarToOutputPos;
 
-    assert rel.getCorrelations().size()
+    assert rel.getRequiredColumns().cardinality()
         <= rightChildMapCorVarToOutputPos.keySet().size();
 
     // Change correlator rel into a join.
     // Join all the correlated variables produced by this correlator rel
     // with the values generated and propagated from the right input
-    RexNode condition = rel.getCondition();
+    RexNode condition = rexBuilder.makeLiteral(true);
     final List<RelDataTypeField> newLeftOutput =
         newLeftRel.getRowType().getFieldList();
     int newLeftFieldCount = newLeftOutput.size();
@@ -1005,8 +1004,13 @@ public class RelDecorrelator implements ReflectiveVisitor {
 
     int newLeftPos;
     int newRightPos;
-    for (Correlation corVar : rel.getCorrelations()) {
-      newLeftPos = leftChildMapOldToNewOutputPos.get(corVar.getOffset());
+    for (Map.Entry<Correlation, Integer> rightOutputPos
+        : Lists.newArrayList(rightChildMapCorVarToOutputPos.entrySet())) {
+      Correlation corVar = rightOutputPos.getKey();
+      if (!corVar.corr.equals(rel.getCorrelationId())) {
+        continue;
+      }
+      newLeftPos = leftChildMapOldToNewOutputPos.get(corVar.field);
       newRightPos = rightChildMapCorVarToOutputPos.get(corVar);
       RexNode equi =
           rexBuilder.makeCall(
@@ -1070,7 +1074,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
             newLeftRel,
             newRightRel,
             condition,
-            rel.getJoinType(),
+            rel.getJoinType().toJoinType(),
             variablesStopped);
 
     mapOldToNewRel.put(rel, newRel);
@@ -1288,11 +1292,11 @@ public class RelDecorrelator implements ReflectiveVisitor {
    * @return the subtree with the new LogicalProject at the root
    */
   private RelNode aggregateCorrelatorOutput(
-      Correlator corRel,
+      LogicalCorrelate corRel,
       LogicalProject projRel,
       Set<Integer> isCount) {
     RelNode leftInputRel = corRel.getLeft();
-    JoinRelType joinType = corRel.getJoinType();
+    JoinRelType joinType = corRel.getJoinType().toJoinType();
 
     // now create the new project
     List<Pair<RexNode, String>> newProjects = Lists.newArrayList();
@@ -1335,7 +1339,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
    * @return true if filter and proj only references corVar provided by corRel
    */
   private boolean checkCorVars(
-      Correlator corRel,
+      LogicalCorrelate corRel,
       LogicalProject projRel,
       LogicalFilter filter,
       List<RexFieldAccess> correlatedJoinKeys) {
@@ -1361,7 +1365,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
       corVarInFilter.addAll(cm.mapRefRelToCorVar.get(filter));
 
       for (Correlation corVar : corVarInFilter) {
-        if (cm.mapCorVarToCorRel.get(corVar) != corRel) {
+        if (cm.mapCorVarToCorRel.get(corVar.corr) != corRel) {
           return false;
         }
       }
@@ -1372,7 +1376,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
     // of the corRel.
     if ((projRel != null) && cm.mapRefRelToCorVar.containsKey(projRel)) {
       for (Correlation corVar : cm.mapRefRelToCorVar.get(projRel)) {
-        if (cm.mapCorVarToCorRel.get(corVar) != corRel) {
+        if (cm.mapCorVarToCorRel.get(corVar.corr) != corRel) {
           return false;
         }
       }
@@ -1386,11 +1390,9 @@ public class RelDecorrelator implements ReflectiveVisitor {
    *
    * @param corRel Correlator
    */
-  private void removeCorVarFromTree(Correlator corRel) {
-    for (Correlation c : Lists.newArrayList(cm.mapCorVarToCorRel.keySet())) {
-      if (cm.mapCorVarToCorRel.get(c) == corRel) {
-        cm.mapCorVarToCorRel.remove(c);
-      }
+  private void removeCorVarFromTree(LogicalCorrelate corRel) {
+    if (cm.mapCorVarToCorRel.get(corRel.getCorrelationId()) == corRel) {
+      cm.mapCorVarToCorRel.remove(corRel.getCorrelationId());
     }
   }
 
@@ -1618,7 +1620,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
         // corVar offset should point to the leftInput of currentRel,
         // which is the Correlator.
         RexNode newRexNode =
-            new RexInputRef(corVar.getOffset(), fieldAccess.getType());
+            new RexInputRef(corVar.field, fieldAccess.getType());
 
         if (projectPulledAboveLeftCorrelator
             && (nullIndicator != null)) {
@@ -1637,14 +1639,14 @@ public class RelDecorrelator implements ReflectiveVisitor {
 
     // override RexShuttle
     public RexNode visitInputRef(RexInputRef inputRef) {
-      if ((currentRel != null) && (currentRel instanceof Correlator)) {
+      if ((currentRel != null) && (currentRel instanceof LogicalCorrelate)) {
         // if this rel references corVar
         // and now it needs to be rewritten
         // it must have been pulled above the Correlator
         // replace the input ref to account for the LHS of the
         // Correlator
         int leftInputFieldCount =
-            ((Correlator) currentRel).getLeft().getRowType()
+            ((LogicalCorrelate) currentRel).getLeft().getRowType()
                 .getFieldCount();
         RelDataType newType = inputRef.getType();
 
@@ -1803,7 +1805,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
   private final class RemoveCorrelationForScalarProjectRule extends RelOptRule {
     public RemoveCorrelationForScalarProjectRule() {
       super(
-          operand(Correlator.class,
+          operand(LogicalCorrelate.class,
               operand(RelNode.class, any()),
               operand(LogicalAggregate.class,
                   operand(LogicalProject.class,
@@ -1811,7 +1813,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
     }
 
     public void onMatch(RelOptRuleCall call) {
-      Correlator corRel = call.rel(0);
+      LogicalCorrelate corRel = call.rel(0);
       RelNode leftInputRel = call.rel(1);
       LogicalAggregate aggRel = call.rel(2);
       LogicalProject projRel = call.rel(3);
@@ -1829,8 +1831,11 @@ public class RelDecorrelator implements ReflectiveVisitor {
       //   LogicalAggregate (groupby (0) single_value())
       //     LogicalProject-A (may reference coVar)
       //       RightInputRel
-      JoinRelType joinType = corRel.getJoinType();
-      RexNode joinCond = corRel.getCondition();
+      JoinRelType joinType = corRel.getJoinType().toJoinType();
+
+      // corRel.getCondition was here, however Correlate was updated so it
+      // never includes a join condition. The code was not modified for brevity.
+      RexNode joinCond = rexBuilder.makeLiteral(true);
       if ((joinType != JoinRelType.LEFT)
           || (joinCond != rexBuilder.makeLiteral(true))) {
         return;
@@ -2004,7 +2009,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
       extends RelOptRule {
     public RemoveCorrelationForScalarAggregateRule() {
       super(
-          operand(Correlator.class,
+          operand(LogicalCorrelate.class,
               operand(RelNode.class, any()),
               operand(LogicalProject.class,
                   operand(LogicalAggregate.class, null, Aggregate.IS_SIMPLE,
@@ -2013,7 +2018,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
     }
 
     public void onMatch(RelOptRuleCall call) {
-      Correlator corRel = call.rel(0);
+      LogicalCorrelate corRel = call.rel(0);
       RelNode leftInputRel = call.rel(1);
       LogicalProject aggOutputProjRel = call.rel(2);
       LogicalAggregate aggRel = call.rel(3);
@@ -2040,8 +2045,10 @@ public class RelDecorrelator implements ReflectiveVisitor {
         return;
       }
 
-      JoinRelType joinType = corRel.getJoinType();
-      RexNode joinCond = corRel.getCondition();
+      JoinRelType joinType = corRel.getJoinType().toJoinType();
+      // corRel.getCondition was here, however Correlate was updated so it
+      // never includes a join condition. The code was not modified for brevity.
+      RexNode joinCond = rexBuilder.makeLiteral(true);
       if ((joinType != JoinRelType.LEFT)
           || (joinCond != rexBuilder.makeLiteral(true))) {
         return;
@@ -2389,18 +2396,18 @@ public class RelDecorrelator implements ReflectiveVisitor {
     public AdjustProjectForCountAggregateRule(boolean flavor) {
       super(
           flavor
-              ? operand(Correlator.class,
+              ? operand(LogicalCorrelate.class,
                   operand(RelNode.class, any()),
                       operand(LogicalProject.class,
                           operand(LogicalAggregate.class, any())))
-              : operand(Correlator.class,
+              : operand(LogicalCorrelate.class,
                   operand(RelNode.class, any()),
                       operand(LogicalAggregate.class, any())));
       this.flavor = flavor;
     }
 
     public void onMatch(RelOptRuleCall call) {
-      Correlator corRel = call.rel(0);
+      LogicalCorrelate corRel = call.rel(0);
       RelNode leftInputRel = call.rel(1);
       LogicalProject aggOutputProjRel;
       LogicalAggregate aggRel;
@@ -2428,7 +2435,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
 
     private void onMatch2(
         RelOptRuleCall call,
-        Correlator corRel,
+        LogicalCorrelate corRel,
         RelNode leftInputRel,
         LogicalProject aggOutputProjRel,
         LogicalAggregate aggRel) {
@@ -2457,8 +2464,10 @@ public class RelDecorrelator implements ReflectiveVisitor {
         return;
       }
 
-      JoinRelType joinType = corRel.getJoinType();
-      RexNode joinCond = corRel.getCondition();
+      JoinRelType joinType = corRel.getJoinType().toJoinType();
+      // corRel.getCondition was here, however Correlate was updated so it
+      // never includes a join condition. The code was not modified for brevity.
+      RexNode joinCond = rexBuilder.makeLiteral(true);
       if ((joinType != JoinRelType.LEFT)
           || (joinCond != rexBuilder.makeLiteral(true))) {
         return;
@@ -2489,12 +2498,13 @@ public class RelDecorrelator implements ReflectiveVisitor {
       //     LeftInputRel
       //     LogicalAggregate (groupby (0), agg0(), agg1()...)
       //
-      Correlator newCorRel =
-          new Correlator(
+      LogicalCorrelate newCorRel =
+          new LogicalCorrelate(
               cluster,
               leftInputRel,
               aggRel,
-              corRel.getCorrelations(),
+              corRel.getCorrelationId(),
+              corRel.getRequiredColumns(),
               corRel.getJoinType());
 
       // remember this rel so we don't fire rule on it again
@@ -2506,10 +2516,8 @@ public class RelDecorrelator implements ReflectiveVisitor {
       // need to update the mapCorVarToCorRel Update the output position
       // for the cor vars: only pass on the cor vars that are not used in
       // the join key.
-      for (Correlation c : Lists.newArrayList(cm.mapCorVarToCorRel.keySet())) {
-        if (cm.mapCorVarToCorRel.get(c) == corRel) {
-          cm.mapCorVarToCorRel.put(c, newCorRel);
-        }
+      if (cm.mapCorVarToCorRel.get(corRel.getCorrelationId()) == corRel) {
+        cm.mapCorVarToCorRel.put(corRel.getCorrelationId(), newCorRel);
       }
 
       RelNode newOutputRel =
@@ -2519,7 +2527,39 @@ public class RelDecorrelator implements ReflectiveVisitor {
     }
   }
 
-  /** A map of the locations of {@link org.apache.calcite.rel.core.Correlator}
+  /**
+   * {@code Correlation} here represents a unique reference to a correlation
+   * field.
+   * For instance, if a RelNode references emp.name multiple times, it would
+   * result in multiple {@code Correlation} objects that differ just in
+   * {@link Correlation#uniqueKey}.
+   */
+  static class Correlation
+      implements Comparable<Correlation> {
+    public final int uniqueKey;
+    public final CorrelationId corr;
+    public final int field;
+
+    Correlation(CorrelationId corr, int field, int uniqueKey) {
+      this.corr = corr;
+      this.field = field;
+      this.uniqueKey = uniqueKey;
+    }
+
+    public int compareTo(Correlation o) {
+      int res = corr.compareTo(o.corr);
+      if (res != 0) {
+        return res;
+      }
+      if (field != o.field) {
+        return field - o.field;
+      }
+      return uniqueKey - o.uniqueKey;
+    }
+  }
+
+  /** A map of the locations of
+   * {@link org.apache.calcite.rel.logical.LogicalCorrelate}
    * in a tree of {@link RelNode}s.
    *
    * <p>It is used to drive the decorrelation process.
@@ -2541,12 +2581,12 @@ public class RelDecorrelator implements ReflectiveVisitor {
    * </ol> */
   private static class CorelMap {
     private final Multimap<RelNode, Correlation> mapRefRelToCorVar;
-    private final SortedMap<Correlation, Correlator> mapCorVarToCorRel;
+    private final SortedMap<CorrelationId, LogicalCorrelate> mapCorVarToCorRel;
     private final Map<RexFieldAccess, Correlation> mapFieldAccessToCorVar;
 
     // TODO: create immutable copies of all maps
     private CorelMap(Multimap<RelNode, Correlation> mapRefRelToCorVar,
-        SortedMap<Correlation, Correlator> mapCorVarToCorRel,
+        SortedMap<CorrelationId, LogicalCorrelate> mapCorVarToCorRel,
         Map<RexFieldAccess, Correlation> mapFieldAccessToCorVar) {
       this.mapRefRelToCorVar = mapRefRelToCorVar;
       this.mapCorVarToCorRel = mapCorVarToCorRel;
@@ -2578,7 +2618,7 @@ public class RelDecorrelator implements ReflectiveVisitor {
     /** Creates a CorelMap with given contents. */
     public static CorelMap of(
         SortedSetMultimap<RelNode, Correlation> mapRefRelToCorVar,
-        SortedMap<Correlation, Correlator> mapCorVarToCorRel,
+        SortedMap<CorrelationId, LogicalCorrelate> mapCorVarToCorRel,
         Map<RexFieldAccess, Correlation> mapFieldAccessToCorVar) {
       return new CorelMap(mapRefRelToCorVar, mapCorVarToCorRel,
           mapFieldAccessToCorVar);
@@ -2586,8 +2626,8 @@ public class RelDecorrelator implements ReflectiveVisitor {
 
     /** Creates a CorelMap by iterating over a {@link RelNode} tree. */
     public static CorelMap build(RelNode rel) {
-      final SortedMap<Correlation, Correlator> mapCorVarToCorRel =
-          new TreeMap<Correlation, Correlator>();
+      final SortedMap<CorrelationId, LogicalCorrelate> mapCorVarToCorRel =
+          new TreeMap<CorrelationId, LogicalCorrelate>();
 
       final SortedSetMultimap<RelNode, Correlation> mapRefRelToCorVar =
           Multimaps.newSortedSetMultimap(
@@ -2602,11 +2642,11 @@ public class RelDecorrelator implements ReflectiveVisitor {
       final Map<RexFieldAccess, Correlation> mapFieldAccessToCorVar =
           new HashMap<RexFieldAccess, Correlation>();
 
-      final Map<String, Correlation> mapNameToCorVar = Maps.newHashMap();
-
       final Holder<Integer> offset = Holder.of(0);
+      final int[] corrIdGenerator = new int[1];
       final RelShuttleImpl shuttle = new RelShuttleImpl() {
         @Override public RelNode visit(LogicalJoin join) {
+          join.getCondition().accept(rexVisitor(join));
           return visitJoin(join);
         }
 
@@ -2615,20 +2655,16 @@ public class RelDecorrelator implements ReflectiveVisitor {
           return super.visitChild(parent, i, stripHep(child));
         }
 
-        @Override public RelNode visit(Correlator correlator) {
-          for (Correlation c : correlator.getCorrelations()) {
-            mapNameToCorVar.put("$cor" + c.getId(), c);
-            mapCorVarToCorRel.put(c, correlator);
-          }
-          return visitJoin(correlator);
+        @Override public RelNode visit(LogicalCorrelate correlate) {
+          mapCorVarToCorRel.put(correlate.getCorrelationId(), correlate);
+          return visitJoin(correlate);
         }
 
-        private Join visitJoin(Join join) {
-          join.getCondition().accept(rexVisitor(join));
+        private RelNode visitJoin(BiRel join) {
           final int x = offset.get();
           visitChild(join, 0, join.getLeft());
           offset.set(x + join.getLeft().getRowType().getFieldCount());
-          visitChild(join, 0, join.getRight());
+          visitChild(join, 1, join.getRight());
           offset.set(x);
           return join;
         }
@@ -2652,7 +2688,10 @@ public class RelDecorrelator implements ReflectiveVisitor {
               if (ref instanceof RexCorrelVariable) {
                 final RexCorrelVariable var = (RexCorrelVariable) ref;
                 final Correlation correlation =
-                    mapNameToCorVar.get(var.getName());
+                    new Correlation(
+                        new CorrelationId(var.getName()),
+                        fieldAccess.getField().getIndex(),
+                        corrIdGenerator[0]++);
                 mapFieldAccessToCorVar.put(fieldAccess, correlation);
                 mapRefRelToCorVar.put(rel, correlation);
               }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/sql2rel/RelStructuredTypeFlattener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql2rel/RelStructuredTypeFlattener.java b/core/src/main/java/org/apache/calcite/sql2rel/RelStructuredTypeFlattener.java
index f3557c6..c66b1a2 100644
--- a/core/src/main/java/org/apache/calcite/sql2rel/RelStructuredTypeFlattener.java
+++ b/core/src/main/java/org/apache/calcite/sql2rel/RelStructuredTypeFlattener.java
@@ -25,13 +25,13 @@ import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelVisitor;
 import org.apache.calcite.rel.core.Collect;
-import org.apache.calcite.rel.core.Correlation;
-import org.apache.calcite.rel.core.Correlator;
+import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.Sample;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.core.Uncollect;
 import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalCalc;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalIntersect;
 import org.apache.calcite.rel.logical.LogicalJoin;
@@ -61,6 +61,7 @@ import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.ReflectUtil;
 import org.apache.calcite.util.ReflectiveVisitDispatcher;
 import org.apache.calcite.util.ReflectiveVisitor;
@@ -143,10 +144,10 @@ public class RelStructuredTypeFlattener implements ReflectiveVisitor {
   //~ Methods ----------------------------------------------------------------
 
   public void updateRelInMap(
-      SortedSetMultimap<RelNode, Correlation> mapRefRelToCorVar) {
+      SortedSetMultimap<RelNode, CorrelationId> mapRefRelToCorVar) {
     for (RelNode rel : Lists.newArrayList(mapRefRelToCorVar.keySet())) {
       if (oldToNewRelMap.containsKey(rel)) {
-        SortedSet<Correlation> corVarSet =
+        SortedSet<CorrelationId> corVarSet =
             mapRefRelToCorVar.removeAll(rel);
         mapRefRelToCorVar.putAll(oldToNewRelMap.get(rel), corVarSet);
       }
@@ -154,13 +155,13 @@ public class RelStructuredTypeFlattener implements ReflectiveVisitor {
   }
 
   public void updateRelInMap(
-      SortedMap<Correlation, Correlator> mapCorVarToCorRel) {
-    for (Correlation corVar : mapCorVarToCorRel.keySet()) {
-      Correlator oldRel = mapCorVarToCorRel.get(corVar);
+      SortedMap<CorrelationId, LogicalCorrelate> mapCorVarToCorRel) {
+    for (CorrelationId corVar : mapCorVarToCorRel.keySet()) {
+      LogicalCorrelate oldRel = mapCorVarToCorRel.get(corVar);
       if (oldToNewRelMap.containsKey(oldRel)) {
         RelNode newRel = oldToNewRelMap.get(oldRel);
-        assert newRel instanceof Correlator;
-        mapCorVarToCorRel.put(corVar, (Correlator) newRel);
+        assert newRel instanceof LogicalCorrelate;
+        mapCorVarToCorRel.put(corVar, (LogicalCorrelate) newRel);
       }
     }
   }
@@ -417,28 +418,24 @@ public class RelStructuredTypeFlattener implements ReflectiveVisitor {
     setNewForOldRel(rel, newRel);
   }
 
-  public void rewriteRel(Correlator rel) {
-    final List<Correlation> newCorrelations =
-        new ArrayList<Correlation>();
-    for (Correlation c : rel.getCorrelations()) {
+  public void rewriteRel(LogicalCorrelate rel) {
+    ImmutableBitSet.Builder newPos = ImmutableBitSet.builder();
+    for (Integer pos : rel.getRequiredColumns()) {
       RelDataType corrFieldType =
-          rel.getLeft().getRowType().getFieldList().get(c.getOffset())
+          rel.getLeft().getRowType().getFieldList().get(pos)
               .getType();
       if (corrFieldType.isStruct()) {
         throw Util.needToImplement("correlation on structured type");
       }
-      newCorrelations.add(
-          new Correlation(
-              c.getId(),
-              getNewForOldInput(c.getOffset())));
+      newPos.set(getNewForOldInput(pos));
     }
-    Correlator newRel =
-        new Correlator(
+    LogicalCorrelate newRel =
+        new LogicalCorrelate(
             rel.getCluster(),
             getNewForOldRel(rel.getLeft()),
             getNewForOldRel(rel.getRight()),
-            rel.getCondition(),
-            newCorrelations,
+            rel.getCorrelationId(),
+            newPos.build(),
             rel.getJoinType());
     setNewForOldRel(rel, newRel);
   }
@@ -700,7 +697,6 @@ public class RelStructuredTypeFlattener implements ReflectiveVisitor {
   public void rewriteRel(LogicalTableScan rel) {
     RelNode newRel =
         rel.getTable().toRel(toRelContext);
-
     setNewForOldRel(rel, newRel);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java b/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
index 2d0cf6c..09b074d 100644
--- a/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
+++ b/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
@@ -30,11 +30,11 @@ import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelCollationImpl;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttle;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.core.Collect;
-import org.apache.calcite.rel.core.Correlation;
-import org.apache.calcite.rel.core.Correlator;
+import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinInfo;
 import org.apache.calcite.rel.core.JoinRelType;
@@ -43,6 +43,7 @@ import org.apache.calcite.rel.core.Sample;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.core.Uncollect;
 import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalCorrelate;
 import org.apache.calcite.rel.logical.LogicalIntersect;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalMinus;
@@ -75,6 +76,7 @@ import org.apache.calcite.schema.ModifiableTable;
 import org.apache.calcite.schema.TranslatableTable;
 import org.apache.calcite.sql.JoinConditionType;
 import org.apache.calcite.sql.JoinType;
+import org.apache.calcite.sql.SemiJoinType;
 import org.apache.calcite.sql.SqlAggFunction;
 import org.apache.calcite.sql.SqlBasicCall;
 import org.apache.calcite.sql.SqlCall;
@@ -2049,7 +2051,14 @@ public class SqlToRelConverter {
 
     Set<String> correlatedVariables = RelOptUtil.getVariablesUsed(rightRel);
     if (correlatedVariables.size() > 0) {
-      final List<Correlation> correlations = Lists.newArrayList();
+      final ImmutableBitSet.Builder requiredColumns = ImmutableBitSet.builder();
+      final List<String> correlNames = Lists.newArrayList();
+
+      // All correlations must refer the same namespace since correlation
+      // produces exactly one correlation source.
+      // The same source might be referenced by different variables since
+      // DeferredLookups are not de-duplicated at create time.
+      SqlValidatorNamespace prevNs = null;
 
       for (String correlName : correlatedVariables) {
         DeferredLookup lookup = mapCorrelToDeferred.get(correlName);
@@ -2073,66 +2082,86 @@ public class SqlToRelConverter {
         SqlValidatorScope ancestorScope = ancestorScopes[0];
         boolean correlInCurrentScope = ancestorScope == bb.scope;
 
-        if (correlInCurrentScope) {
-          int namespaceOffset = 0;
-          if (childNamespaceIndex > 0) {
-            // If not the first child, need to figure out the width
-            // of output types from all the preceding namespaces
-            assert ancestorScope instanceof ListScope;
-            List<SqlValidatorNamespace> children =
-                ((ListScope) ancestorScope).getChildren();
-
-            for (int i = 0; i < childNamespaceIndex; i++) {
-              SqlValidatorNamespace child = children.get(i);
-              namespaceOffset +=
-                  child.getRowType().getFieldCount();
-            }
-          }
+        if (!correlInCurrentScope) {
+          continue;
+        }
 
-          RelDataTypeField field =
-              catalogReader.field(foundNs.getRowType(), originalFieldName);
-          int pos = namespaceOffset + field.getIndex();
-
-          assert field.getType()
-              == lookup.getFieldAccess(correlName).getField().getType();
-
-          assert pos != -1;
-
-          if (bb.mapRootRelToFieldProjection.containsKey(bb.root)) {
-            // bb.root is an aggregate and only projects group by
-            // keys.
-            Map<Integer, Integer> exprProjection =
-                bb.mapRootRelToFieldProjection.get(bb.root);
-
-            // subquery can reference group by keys projected from
-            // the root of the outer relation.
-            if (exprProjection.containsKey(pos)) {
-              pos = exprProjection.get(pos);
-            } else {
-              // correl not grouped
-              throw Util.newInternal(
-                  "Identifier '" + originalRelName + "."
-                  + originalFieldName + "' is not a group expr");
-            }
+        if (prevNs == null) {
+          prevNs = foundNs;
+        } else {
+          assert prevNs == foundNs : "All correlation variables should resolve"
+              + " to the same namespace."
+              + " Prev ns=" + prevNs
+              + ", new ns=" + foundNs;
+        }
+
+        int namespaceOffset = 0;
+        if (childNamespaceIndex > 0) {
+          // If not the first child, need to figure out the width
+          // of output types from all the preceding namespaces
+          assert ancestorScope instanceof ListScope;
+          List<SqlValidatorNamespace> children =
+              ((ListScope) ancestorScope).getChildren();
+
+          for (int i = 0; i < childNamespaceIndex; i++) {
+            SqlValidatorNamespace child = children.get(i);
+            namespaceOffset +=
+                child.getRowType().getFieldCount();
           }
+        }
+
+        RelDataTypeField field =
+            catalogReader.field(foundNs.getRowType(), originalFieldName);
+        int pos = namespaceOffset + field.getIndex();
+
+        assert field.getType()
+            == lookup.getFieldAccess(correlName).getField().getType();
 
-          Correlation newCorVar =
-              new Correlation(
-                  getCorrelOrdinal(correlName),
-                  pos);
+        assert pos != -1;
 
-          correlations.add(newCorVar);
+        if (bb.mapRootRelToFieldProjection.containsKey(bb.root)) {
+          // bb.root is an aggregate and only projects group by
+          // keys.
+          Map<Integer, Integer> exprProjection =
+              bb.mapRootRelToFieldProjection.get(bb.root);
+
+          // subquery can reference group by keys projected from
+          // the root of the outer relation.
+          if (exprProjection.containsKey(pos)) {
+            pos = exprProjection.get(pos);
+          } else {
+            // correl not grouped
+            throw Util.newInternal(
+                "Identifier '" + originalRelName + "."
+                + originalFieldName + "' is not a group expr");
+          }
         }
+
+        requiredColumns.set(pos);
+        correlNames.add(correlName);
       }
 
-      if (!correlations.isEmpty()) {
-        return new Correlator(
+      if (!correlNames.isEmpty()) {
+        if (correlNames.size() > 1) {
+          // The same table was referenced more than once.
+          // So we deduplicate
+          RelShuttle dedup =
+              new DeduplicateCorrelateVariables(rexBuilder,
+                  correlNames.get(0),
+                  ImmutableSet.copyOf(Util.skip(correlNames)));
+          rightRel = rightRel.accept(dedup);
+        }
+        LogicalCorrelate corr = new LogicalCorrelate(
             rightRel.getCluster(),
             leftRel,
             rightRel,
-            joinCond,
-            correlations,
-            joinType);
+            new CorrelationId(correlNames.get(0)),
+            requiredColumns.build(),
+            SemiJoinType.of(joinType));
+        if (!joinCond.isAlwaysTrue()) {
+          return RelOptUtil.createFilter(corr, joinCond);
+        }
+        return corr;
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/tools/Programs.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/tools/Programs.java b/core/src/main/java/org/apache/calcite/tools/Programs.java
index e9aa502..a16e83c 100644
--- a/core/src/main/java/org/apache/calcite/tools/Programs.java
+++ b/core/src/main/java/org/apache/calcite/tools/Programs.java
@@ -100,6 +100,7 @@ public class Programs {
       ImmutableSet.of(
           EnumerableRules.ENUMERABLE_JOIN_RULE,
           EnumerableRules.ENUMERABLE_SEMI_JOIN_RULE,
+          EnumerableRules.ENUMERABLE_CORRELATE_RULE,
           EnumerableRules.ENUMERABLE_PROJECT_RULE,
           EnumerableRules.ENUMERABLE_FILTER_RULE,
           EnumerableRules.ENUMERABLE_AGGREGATE_RULE,

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
index be15c2e..ad83a92 100644
--- a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
+++ b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
@@ -23,6 +23,7 @@ import org.apache.calcite.avatica.util.DateTimeUtils;
 import org.apache.calcite.avatica.util.TimeUnitRange;
 import org.apache.calcite.interpreter.Row;
 import org.apache.calcite.linq4j.AbstractEnumerable;
+import org.apache.calcite.linq4j.CorrelateJoinType;
 import org.apache.calcite.linq4j.Enumerable;
 import org.apache.calcite.linq4j.EnumerableDefaults;
 import org.apache.calcite.linq4j.Enumerator;
@@ -123,6 +124,8 @@ public enum BuiltInMethod {
   SLICE0(Enumerables.class, "slice0", Enumerable.class),
   SEMI_JOIN(Enumerables.class, "semiJoin", Enumerable.class, Enumerable.class,
       Function1.class, Function1.class),
+  CORRELATE_JOIN(ExtendedEnumerable.class, "correlateJoin",
+      CorrelateJoinType.class, Function1.class, Function2.class),
   SELECT(ExtendedEnumerable.class, "select", Function1.class),
   SELECT2(ExtendedEnumerable.class, "select", Function2.class),
   SELECT_MANY(ExtendedEnumerable.class, "selectMany", Function1.class),

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/main/java/org/apache/calcite/util/trace/CalciteTrace.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/util/trace/CalciteTrace.java b/core/src/main/java/org/apache/calcite/util/trace/CalciteTrace.java
index 2f69f21..eb04640 100644
--- a/core/src/main/java/org/apache/calcite/util/trace/CalciteTrace.java
+++ b/core/src/main/java/org/apache/calcite/util/trace/CalciteTrace.java
@@ -18,9 +18,9 @@ package org.apache.calcite.util.trace;
 
 import org.apache.calcite.linq4j.function.Function2;
 import org.apache.calcite.linq4j.function.Functions;
+import org.apache.calcite.plan.RelImplementor;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.prepare.Prepare;
-import org.apache.calcite.rel.RelImplementorImpl;
 
 import java.io.File;
 import java.util.logging.Level;
@@ -96,7 +96,7 @@ public abstract class CalciteTrace {
    * expressions are bound to variables ({@link Level#FINE})
    */
   public static Logger getRelImplementorTracer() {
-    return Logger.getLogger(RelImplementorImpl.class.getName());
+    return Logger.getLogger(RelImplementor.class.getName());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/test/java/org/apache/calcite/test/CalciteSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/CalciteSuite.java b/core/src/test/java/org/apache/calcite/test/CalciteSuite.java
index f3ad06a..aeeb360 100644
--- a/core/src/test/java/org/apache/calcite/test/CalciteSuite.java
+++ b/core/src/test/java/org/apache/calcite/test/CalciteSuite.java
@@ -30,6 +30,7 @@ import org.apache.calcite.sql.test.SqlAdvisorTest;
 import org.apache.calcite.sql.test.SqlOperatorTest;
 import org.apache.calcite.sql.test.SqlPrettyWriterTest;
 import org.apache.calcite.sql.test.SqlTypeNameTest;
+import org.apache.calcite.test.enumerable.EnumerableCorrelateTest;
 import org.apache.calcite.tools.FrameworksTest;
 import org.apache.calcite.tools.PlannerTest;
 import org.apache.calcite.util.BitSetsTest;
@@ -99,6 +100,7 @@ import org.junit.runners.Suite;
     SqlOperatorTest.class,
     ChunkListTest.class,
     FrameworksTest.class,
+    EnumerableCorrelateTest.class,
 
     // slow tests (above 1s)
     PlannerTest.class,

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/test/java/org/apache/calcite/test/JdbcTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/JdbcTest.java b/core/src/test/java/org/apache/calcite/test/JdbcTest.java
index 426aa78..9b7d947 100644
--- a/core/src/test/java/org/apache/calcite/test/JdbcTest.java
+++ b/core/src/test/java/org/apache/calcite/test/JdbcTest.java
@@ -4176,7 +4176,7 @@ public class JdbcTest {
             new Function<RelNode, Void>() {
               public Void apply(RelNode relNode) {
                 String s = RelOptUtil.toString(relNode);
-                assertThat(s, not(containsString("Correlator")));
+                assertThat(s, not(containsString("Correlate")));
                 return null;
               }
             });

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java b/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
index 76e18ce..62b754a 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
@@ -838,6 +838,24 @@ public class SqlToRelConverterTest extends SqlToRelTestBase {
         "${plan}");
   }
 
+  @Test public void testNestedCorrelations() {
+    tester.withDecorrelation(false).assertConvertsTo(
+        "select * from (select 2+deptno d2, 3+deptno d3 from emp) e\n"
+            + " where exists (select 1 from (select deptno+1 d1 from dept) d\n"
+            + " where d1=e.d2 and exists (select 2 from (select deptno+4 d4, deptno+5 d5, deptno+6 d6 from dept)\n"
+            + " where d4=d.d1 and d5=d.d1 and d6=e.d3))",
+        "${plan}");
+  }
+
+  @Test public void testNestedCorrelationsDecorrelated() {
+    tester.withDecorrelation(true).assertConvertsTo(
+        "select * from (select 2+deptno d2, 3+deptno d3 from emp) e\n"
+            + " where exists (select 1 from (select deptno+1 d1 from dept) d\n"
+            + " where d1=e.d2 and exists (select 2 from (select deptno+4 d4, deptno+5 d5, deptno+6 d6 from dept)\n"
+            + " where d4=d.d1 and d5=d.d1 and d6=e.d3))",
+        "${plan}");
+  }
+
   @Test public void testElement() {
     check("select element(multiset[5]) from emp", "${plan}");
   }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/test/java/org/apache/calcite/test/enumerable/EnumerableCorrelateTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/enumerable/EnumerableCorrelateTest.java b/core/src/test/java/org/apache/calcite/test/enumerable/EnumerableCorrelateTest.java
new file mode 100644
index 0000000..a725e52
--- /dev/null
+++ b/core/src/test/java/org/apache/calcite/test/enumerable/EnumerableCorrelateTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.test.enumerable;
+
+import org.apache.calcite.adapter.java.ReflectiveSchema;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.test.CalciteAssert;
+import org.apache.calcite.test.JdbcTest;
+import org.apache.calcite.util.Bug;
+
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.Properties;
+
+/**
+ * Tests {@link org.apache.calcite.adapter.enumerable.EnumerableCorrelate}
+ */
+public class EnumerableCorrelateTest {
+  @Test public void simpleCorrelateDecorrelated() {
+    tester(true, new JdbcTest.HrSchema())
+        .query(
+            "select empid, name from emps e where exists (select 1 from depts d where d.deptno=e.deptno)")
+        .explainContains(
+            "EnumerableCalc(expr#0..4=[{inputs}], empid=[$t0], name=[$t2])\n"
+            + "  EnumerableSemiJoin(condition=[=($1, $5)], joinType=[inner])\n"
+            + "    EnumerableTableScan(table=[[s, emps]])\n"
+            + "    EnumerableCalc(expr#0..3=[{inputs}], expr#4=[true], $f01=[$t0], $f0=[$t4])\n"
+            + "      EnumerableJoin(condition=[=($0, $1)], joinType=[inner])\n"
+            + "        EnumerableAggregate(group=[{0}])\n"
+            + "          EnumerableCalc(expr#0..4=[{inputs}], $f0=[$t1])\n"
+            + "            EnumerableTableScan(table=[[s, emps]])\n"
+            + "        EnumerableTableScan(table=[[s, depts]])")
+        .returnsUnordered(
+            "empid=100; name=Bill",
+            "empid=110; name=Theodore",
+            "empid=150; name=Sebastian");
+
+  }
+
+  @Test public void simpleCorrelate() {
+    tester(false, new JdbcTest.HrSchema())
+        .query(
+            "select empid, name from emps e where exists (select 1 from depts d where d.deptno=e.deptno)")
+        .explainContains(
+            "EnumerableCalc(expr#0..5=[{inputs}], expr#6=[IS NOT NULL($t5)], empid=[$t0], name=[$t2], $condition=[$t6])\n"
+            + "  EnumerableCorrelate(correlation=[$cor0], joinType=[LEFT], requiredColumns=[{1}])\n"
+            + "    EnumerableTableScan(table=[[s, emps]])\n"
+            + "    EnumerableAggregate(group=[{}], agg#0=[MIN($0)])\n"
+            + "      EnumerableCalc(expr#0..2=[{inputs}], expr#3=[true], expr#4=[$cor0], expr#5=[$t4.deptno], expr#6=[=($t0, $t5)], $f0=[$t3], $condition=[$t6])\n"
+            + "        EnumerableTableScan(table=[[s, depts]])")
+        .returnsUnordered(
+            "empid=100; name=Bill",
+            "empid=110; name=Theodore",
+            "empid=150; name=Sebastian");
+
+  }
+
+  private CalciteAssert.AssertThat tester(final boolean forceDecorrelate,
+      final Object schema) {
+    Bug.remark(
+        "CALCITE-489 - Teach CalciteAssert to respect multiple settings");
+    final Properties p = new Properties();
+    p.setProperty("lex", "JAVA");
+    p.setProperty("forceDecorrelate", Boolean.toString(forceDecorrelate));
+    return CalciteAssert.that()
+        .with(new CalciteAssert.ConnectionFactory() {
+          public CalciteConnection createConnection() throws Exception {
+            Connection connection =
+                DriverManager.getConnection("jdbc:calcite:", p);
+            CalciteConnection calciteConnection =
+                connection.unwrap(CalciteConnection.class);
+            SchemaPlus rootSchema =
+                calciteConnection.getRootSchema();
+            rootSchema.add("s", new ReflectiveSchema(schema));
+            calciteConnection.setSchema("s");
+            return calciteConnection;
+          }
+        });
+  }
+}
+
+// End EnumerableCorrelateTest.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/test/java/org/apache/calcite/test/enumerable/package-info.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/calcite/test/enumerable/package-info.java b/core/src/test/java/org/apache/calcite/test/enumerable/package-info.java
new file mode 100644
index 0000000..0b3113d
--- /dev/null
+++ b/core/src/test/java/org/apache/calcite/test/enumerable/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Tests for Enumerable convention runtime.
+ */
+package org.apache.calcite.test.enumerable;
+
+// End package-info.java