You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by jh...@apache.org on 2016/01/11 05:47:24 UTC

[23/27] calcite git commit: [CALCITE-816] Represent sub-query as a RexNode

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/rel/logical/LogicalJoin.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/logical/LogicalJoin.java b/core/src/main/java/org/apache/calcite/rel/logical/LogicalJoin.java
index c65499a..aacd72a 100644
--- a/core/src/main/java/org/apache/calcite/rel/logical/LogicalJoin.java
+++ b/core/src/main/java/org/apache/calcite/rel/logical/LogicalJoin.java
@@ -23,11 +23,13 @@ import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelShuttle;
 import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.rex.RexNode;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 
@@ -73,7 +75,7 @@ public final class LogicalJoin extends Join {
    * @param right            Right input
    * @param condition        Join condition
    * @param joinType         Join type
-   * @param variablesStopped Set of names of variables which are set by the
+   * @param variablesSet     Set of variables that are set by the
    *                         LHS and used by the RHS and are not available to
    *                         nodes above this LogicalJoin in the tree
    * @param semiJoinDone     Whether this join has been translated to a
@@ -89,22 +91,30 @@ public final class LogicalJoin extends Join {
       RelNode left,
       RelNode right,
       RexNode condition,
+      Set<CorrelationId> variablesSet,
       JoinRelType joinType,
-      Set<String> variablesStopped,
       boolean semiJoinDone,
       ImmutableList<RelDataTypeField> systemFieldList) {
-    super(cluster, traitSet, left, right, condition, joinType,
-        variablesStopped);
-    assert systemFieldList != null;
+    super(cluster, traitSet, left, right, condition, variablesSet, joinType);
     this.semiJoinDone = semiJoinDone;
-    this.systemFieldList = systemFieldList;
+    this.systemFieldList = Preconditions.checkNotNull(systemFieldList);
+  }
+
+  @Deprecated // to be removed before 2.0
+  public LogicalJoin(RelOptCluster cluster, RelTraitSet traitSet, RelNode left,
+      RelNode right, RexNode condition, JoinRelType joinType,
+      Set<String> variablesStopped, boolean semiJoinDone,
+      ImmutableList<RelDataTypeField> systemFieldList) {
+    this(cluster, traitSet, left, right, condition,
+        CorrelationId.setOf(variablesStopped), joinType, semiJoinDone,
+        systemFieldList);
   }
 
   @Deprecated // to be removed before 2.0
   public LogicalJoin(RelOptCluster cluster, RelNode left, RelNode right,
       RexNode condition, JoinRelType joinType, Set<String> variablesStopped) {
     this(cluster, cluster.traitSetOf(Convention.NONE), left, right, condition,
-        joinType, variablesStopped, false,
+        CorrelationId.setOf(variablesStopped), joinType, false,
         ImmutableList.<RelDataTypeField>of());
   }
 
@@ -113,53 +123,62 @@ public final class LogicalJoin extends Join {
       RexNode condition, JoinRelType joinType, Set<String> variablesStopped,
       boolean semiJoinDone, ImmutableList<RelDataTypeField> systemFieldList) {
     this(cluster, cluster.traitSetOf(Convention.NONE), left, right, condition,
-        joinType, variablesStopped, semiJoinDone, systemFieldList);
+        CorrelationId.setOf(variablesStopped), joinType, semiJoinDone,
+        systemFieldList);
   }
 
   /**
    * Creates a LogicalJoin by parsing serialized output.
    */
   public LogicalJoin(RelInput input) {
-    this(
-        input.getCluster(), input.getInputs().get(0),
-        input.getInputs().get(1), input.getExpression("condition"),
-        input.getEnum("joinType", JoinRelType.class),
-        ImmutableSet.<String>of(), false,
+    this(input.getCluster(), input.getCluster().traitSetOf(Convention.NONE),
+        input.getInputs().get(0), input.getInputs().get(1),
+        input.getExpression("condition"), ImmutableSet.<CorrelationId>of(),
+        input.getEnum("joinType", JoinRelType.class), false,
         ImmutableList.<RelDataTypeField>of());
   }
 
   /** Creates a LogicalJoin, flagged with whether it has been translated to a
    * semi-join. */
   public static LogicalJoin create(RelNode left, RelNode right,
-      RexNode condition, JoinRelType joinType, Set<String> variablesStopped,
+      RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType,
       boolean semiJoinDone, ImmutableList<RelDataTypeField> systemFieldList) {
     final RelOptCluster cluster = left.getCluster();
     final RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE);
-    return new LogicalJoin(cluster, traitSet, left, right, condition, joinType,
-        variablesStopped, semiJoinDone, systemFieldList);
+    return new LogicalJoin(cluster, traitSet, left, right, condition,
+        variablesSet, joinType, semiJoinDone, systemFieldList);
+  }
+
+  @Deprecated // to be removed before 2.0
+  public static LogicalJoin create(RelNode left, RelNode right,
+      RexNode condition, JoinRelType joinType, Set<String> variablesStopped,
+      boolean semiJoinDone, ImmutableList<RelDataTypeField> systemFieldList) {
+    return create(left, right, condition, CorrelationId.setOf(variablesStopped),
+        joinType, semiJoinDone, systemFieldList);
   }
 
   /** Creates a LogicalJoin. */
   public static LogicalJoin create(RelNode left, RelNode right,
-      RexNode condition, JoinRelType joinType, Set<String> variablesStopped) {
-    return create(left, right, condition, joinType, variablesStopped, false,
+      RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) {
+    return create(left, right, condition, variablesSet, joinType, false,
         ImmutableList.<RelDataTypeField>of());
   }
 
+  @Deprecated // to be removed before 2.0
+  public static LogicalJoin create(RelNode left, RelNode right,
+      RexNode condition, JoinRelType joinType, Set<String> variablesStopped) {
+    return create(left, right, condition, CorrelationId.setOf(variablesStopped),
+        joinType, false, ImmutableList.<RelDataTypeField>of());
+  }
+
   //~ Methods ----------------------------------------------------------------
 
   @Override public LogicalJoin copy(RelTraitSet traitSet, RexNode conditionExpr,
       RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) {
     assert traitSet.containsIfApplicable(Convention.NONE);
-    return new LogicalJoin(
-        getCluster(),
-        left,
-        right,
-        conditionExpr,
-        joinType,
-        this.variablesStopped,
-        semiJoinDone,
-        this.systemFieldList);
+    return new LogicalJoin(getCluster(),
+        getCluster().traitSetOf(Convention.NONE), left, right, conditionExpr,
+        variablesSet, joinType, semiJoinDone, systemFieldList);
   }
 
   @Override public RelNode accept(RelShuttle shuttle) {

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/rel/metadata/RelMdUniqueKeys.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdUniqueKeys.java b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdUniqueKeys.java
index db6788a..d77ca13 100644
--- a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdUniqueKeys.java
+++ b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdUniqueKeys.java
@@ -28,7 +28,6 @@ import org.apache.calcite.rel.core.SetOp;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.util.BitSets;
 import org.apache.calcite.util.BuiltInMethod;
 import org.apache.calcite.util.ImmutableBitSet;
 
@@ -76,11 +75,11 @@ public class RelMdUniqueKeys {
     //
     // Further more, the unique bitset coming from the child needs
     // to be mapped to match the output of the project.
-    Map<Integer, Integer> mapInToOutPos = new HashMap<Integer, Integer>();
+    Map<Integer, Integer> mapInToOutPos = new HashMap<>();
 
     List<RexNode> projExprs = rel.getProjects();
 
-    Set<ImmutableBitSet> projUniqueKeySet = new HashSet<ImmutableBitSet>();
+    Set<ImmutableBitSet> projUniqueKeySet = new HashSet<>();
 
     // Build an input to output position map.
     for (int i = 0; i < projExprs.size(); i++) {
@@ -105,7 +104,7 @@ public class RelMdUniqueKeys {
       for (ImmutableBitSet colMask : childUniqueKeySet) {
         ImmutableBitSet.Builder tmpMask = ImmutableBitSet.builder();
         boolean completeKeyProjected = true;
-        for (int bit : BitSets.toIter(colMask)) {
+        for (int bit : colMask) {
           if (mapInToOutPos.containsKey(bit)) {
             tmpMask.set(mapInToOutPos.get(bit));
           } else {
@@ -137,7 +136,7 @@ public class RelMdUniqueKeys {
     // that is undesirable, use RelMetadataQuery.areColumnsUnique() as
     // an alternative way of getting unique key information.
 
-    Set<ImmutableBitSet> retSet = new HashSet<ImmutableBitSet>();
+    Set<ImmutableBitSet> retSet = new HashSet<>();
     Set<ImmutableBitSet> leftSet =
         RelMetadataQuery.getUniqueKeys(left, ignoreNulls);
     Set<ImmutableBitSet> rightSet = null;
@@ -147,7 +146,7 @@ public class RelMdUniqueKeys {
     int nFieldsOnLeft = left.getRowType().getFieldCount();
 
     if (tmpRightSet != null) {
-      rightSet = new HashSet<ImmutableBitSet>();
+      rightSet = new HashSet<>();
       for (ImmutableBitSet colMask : tmpRightSet) {
         ImmutableBitSet.Builder tmpMask = ImmutableBitSet.builder();
         for (int bit : colMask) {

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/rel/rules/EquiJoin.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/EquiJoin.java b/core/src/main/java/org/apache/calcite/rel/rules/EquiJoin.java
index 32f4606..52a0474 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/EquiJoin.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/EquiJoin.java
@@ -19,6 +19,7 @@ package org.apache.calcite.rel.rules;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.ImmutableIntList;
@@ -38,7 +39,7 @@ public abstract class EquiJoin extends org.apache.calcite.rel.core.EquiJoin {
       ImmutableIntList rightKeys, JoinRelType joinType,
       Set<String> variablesStopped) {
     super(cluster, traits, left, right, condition, leftKeys, rightKeys,
-        joinType, variablesStopped);
+        CorrelationId.setOf(variablesStopped), joinType);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/rel/rules/JoinToCorrelateRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/JoinToCorrelateRule.java b/core/src/main/java/org/apache/calcite/rel/rules/JoinToCorrelateRule.java
index 5280524..5afbe2a 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/JoinToCorrelateRule.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/JoinToCorrelateRule.java
@@ -102,10 +102,9 @@ public class JoinToCorrelateRule extends RelOptRule {
     final RelOptCluster cluster = join.getCluster();
     final RexBuilder rexBuilder = cluster.getRexBuilder();
     final RelBuilder relBuilder = call.builder();
-    final int dynInId = cluster.createCorrel();
-    final CorrelationId correlationId = new CorrelationId(dynInId);
+    final CorrelationId correlationId = cluster.createCorrel();
     final RexNode corrVar =
-        rexBuilder.makeCorrel(left.getRowType(), correlationId.getName());
+        rexBuilder.makeCorrel(left.getRowType(), correlationId);
     final ImmutableBitSet.Builder requiredColumns = ImmutableBitSet.builder();
 
     // Replace all references of left input with FieldAccess(corrVar, field)

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/rel/rules/JoinUnionTransposeRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/JoinUnionTransposeRule.java b/core/src/main/java/org/apache/calcite/rel/rules/JoinUnionTransposeRule.java
index c1a7f84..4e522c8 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/JoinUnionTransposeRule.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/JoinUnionTransposeRule.java
@@ -70,7 +70,7 @@ public class JoinUnionTransposeRule extends RelOptRule {
     if (!unionRel.all) {
       return;
     }
-    if (!join.getVariablesStopped().isEmpty()) {
+    if (!join.getVariablesSet().isEmpty()) {
       return;
     }
     // The UNION ALL cannot be on the null generating side

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/rel/rules/SubQueryRemoveRule.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/SubQueryRemoveRule.java b/core/src/main/java/org/apache/calcite/rel/rules/SubQueryRemoveRule.java
new file mode 100644
index 0000000..c3daf49
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rel/rules/SubQueryRemoveRule.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Correlate;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.LogicVisitor;
+import org.apache.calcite.rex.RexCorrelVariable;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexSubQuery;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql2rel.RelDecorrelator;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Pair;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Transform that converts IN, EXISTS and scalar sub-queries into joins.
+ *
+ * <p>Sub-queries are represented by {@link RexSubQuery} expressions.
+ *
+ * <p>A sub-query may or may not be correlated. If a sub-query is correlated,
+ * the wrapped {@link RelNode} will contain a {@link RexCorrelVariable} before
+ * the rewrite, and the product of the rewrite will be a {@link Correlate}.
+ * The Correlate can be removed using {@link RelDecorrelator}.
+ */
+public abstract class SubQueryRemoveRule extends RelOptRule {
+  public static final SubQueryRemoveRule PROJECT =
+      new SubQueryRemoveRule(
+          operand(Project.class, null, RexUtil.SubQueryFinder.PROJECT_PREDICATE,
+              any()),
+          RelFactories.LOGICAL_BUILDER, "SubQueryRemoveRule:Project") {
+        public void onMatch(RelOptRuleCall call) {
+          final Project project = call.rel(0);
+          final RelBuilder builder = call.builder();
+          final RexSubQuery e =
+              RexUtil.SubQueryFinder.find(project.getProjects());
+          assert e != null;
+          final RelOptUtil.Logic logic =
+              LogicVisitor.find(RelOptUtil.Logic.TRUE_FALSE_UNKNOWN,
+                  project.getProjects(), e);
+          builder.push(project.getInput());
+          final int fieldCount = builder.peek().getRowType().getFieldCount();
+          final RexNode target = apply(e, ImmutableSet.<CorrelationId>of(),
+              logic, builder, 1, fieldCount);
+          final RexShuttle shuttle = new ReplaceSubQueryShuttle(e, target);
+          builder.project(shuttle.apply(project.getProjects()),
+              project.getRowType().getFieldNames());
+          call.transformTo(builder.build());
+        }
+      };
+
+  public static final SubQueryRemoveRule FILTER =
+      new SubQueryRemoveRule(
+          operand(Filter.class, null, RexUtil.SubQueryFinder.FILTER_PREDICATE,
+              any()),
+          RelFactories.LOGICAL_BUILDER, "SubQueryRemoveRule:Filter") {
+        public void onMatch(RelOptRuleCall call) {
+          final Filter filter = call.rel(0);
+          final RelBuilder builder = call.builder();
+          final RexSubQuery e =
+              RexUtil.SubQueryFinder.find(filter.getCondition());
+          assert e != null;
+          final RelOptUtil.Logic logic =
+              LogicVisitor.find(RelOptUtil.Logic.TRUE,
+                  ImmutableList.of(filter.getCondition()), e);
+          builder.push(filter.getInput());
+          final int fieldCount = builder.peek().getRowType().getFieldCount();
+          final RexNode target = apply(e, filter.getVariablesSet(), logic,
+              builder, 1, fieldCount);
+          final RexShuttle shuttle = new ReplaceSubQueryShuttle(e, target);
+          builder.filter(shuttle.apply(filter.getCondition()));
+          builder.project(fields(builder, filter.getRowType().getFieldCount()));
+          call.transformTo(builder.build());
+        }
+      };
+
+  public static final SubQueryRemoveRule JOIN =
+      new SubQueryRemoveRule(
+          operand(Join.class, null, RexUtil.SubQueryFinder.JOIN_PREDICATE,
+              any()), RelFactories.LOGICAL_BUILDER, "SubQueryRemoveRule:Join") {
+        public void onMatch(RelOptRuleCall call) {
+          final Join join = call.rel(0);
+          final RelBuilder builder = call.builder();
+          final RexSubQuery e =
+              RexUtil.SubQueryFinder.find(join.getCondition());
+          assert e != null;
+          final RelOptUtil.Logic logic =
+              LogicVisitor.find(RelOptUtil.Logic.TRUE,
+                  ImmutableList.of(join.getCondition()), e);
+          builder.push(join.getLeft());
+          builder.push(join.getRight());
+          final int fieldCount = join.getRowType().getFieldCount();
+          final RexNode target = apply(e, ImmutableSet.<CorrelationId>of(),
+              logic, builder, 2, fieldCount);
+          final RexShuttle shuttle = new ReplaceSubQueryShuttle(e, target);
+          builder.join(join.getJoinType(), shuttle.apply(join.getCondition()));
+          builder.project(fields(builder, join.getRowType().getFieldCount()));
+          call.transformTo(builder.build());
+        }
+      };
+
+  private SubQueryRemoveRule(RelOptRuleOperand operand,
+      RelBuilderFactory relBuilderFactory,
+      String description) {
+    super(operand, relBuilderFactory, description);
+  }
+
+  protected RexNode apply(RexSubQuery e, Set<CorrelationId> variablesSet,
+      RelOptUtil.Logic logic,
+      RelBuilder builder, int inputCount, int offset) {
+    switch (e.getKind()) {
+    case SCALAR_QUERY:
+      builder.push(e.rel);
+      final Boolean unique = RelMetadataQuery.areColumnsUnique(builder.peek(),
+          ImmutableBitSet.of());
+      if (unique == null || !unique) {
+        builder.aggregate(builder.groupKey(),
+            builder.aggregateCall(SqlStdOperatorTable.SINGLE_VALUE, false, null,
+                null, builder.field(0)));
+      }
+      builder.join(JoinRelType.LEFT);
+      return field(builder, inputCount, offset);
+
+    case IN:
+    case EXISTS:
+      // Most general case, where the left and right keys might have nulls, and
+      // caller requires 3-valued logic return.
+      //
+      // select e.deptno, e.deptno in (select deptno from emp)
+      //
+      // becomes
+      //
+      // select e.deptno,
+      //   case
+      //   when ct.c = 0 then false
+      //   when dt.i is not null then true
+      //   when e.deptno is null then null
+      //   when ct.ck < ct.c then null
+      //   else false
+      //   end
+      // from e
+      // left join (
+      //   (select count(*) as c, count(deptno) as ck from emp) as ct
+      //   cross join (select distinct deptno, true as i from emp)) as dt
+      //   on e.deptno = dt.deptno
+      //
+      // If keys are not null we can remove "ct" and simplify to
+      //
+      // select e.deptno,
+      //   case
+      //   when dt.i is not null then true
+      //   else false
+      //   end
+      // from e
+      // left join (select distinct deptno, true as i from emp) as dt
+      //   on e.deptno = dt.deptno
+      //
+      // We could further simplify to
+      //
+      // select e.deptno,
+      //   dt.i is not null
+      // from e
+      // left join (select distinct deptno, true as i from emp) as dt
+      //   on e.deptno = dt.deptno
+      //
+      // but have not yet.
+      //
+      // If the logic is TRUE we can just kill the record if the condition
+      // evaluates to FALSE or UNKNOWN. Thus the query simplifies to an inner
+      // join:
+      //
+      // select e.deptno,
+      //   true
+      // from e
+      // inner join (select distinct deptno from emp) as dt
+      //   on e.deptno = dt.deptno
+      //
+
+      builder.push(e.rel);
+      final List<RexNode> fields = new ArrayList<>();
+      switch (e.getKind()) {
+      case IN:
+        fields.addAll(builder.fields());
+      }
+
+      // First, the cross join
+      switch (logic) {
+      case TRUE_FALSE_UNKNOWN:
+      case UNKNOWN_AS_TRUE:
+        if (!variablesSet.isEmpty()) {
+          // We have not yet figured out how to include "ct" in a query if
+          // the source relation "e.rel" is correlated. So, dodge the issue:
+          // we pretend that the join key is NOT NULL.
+          //
+          // We will get wrong results in correlated IN where the join
+          // key has nulls. E.g.
+          //
+          //   SELECT *
+          //   FROM emp
+          //   WHERE mgr NOT IN (
+          //     SELECT mgr
+          //     FROM emp AS e2
+          //     WHERE
+          logic = RelOptUtil.Logic.TRUE_FALSE;
+          break;
+        }
+        builder.aggregate(builder.groupKey(),
+            builder.count(false, "c"),
+            builder.aggregateCall(SqlStdOperatorTable.COUNT, false, null, "ck",
+                builder.fields()));
+        builder.as("ct");
+        builder.join(JoinRelType.INNER);
+        offset += 2;
+        builder.push(e.rel);
+        break;
+      }
+
+      // Now the left join
+      switch (logic) {
+      case TRUE:
+        if (fields.isEmpty()) {
+          builder.project(builder.alias(builder.literal(true), "i"));
+          builder.aggregate(builder.groupKey(0));
+        } else {
+          builder.aggregate(builder.groupKey(fields));
+        }
+        break;
+      default:
+        fields.add(builder.alias(builder.literal(true), "i"));
+        builder.project(fields);
+        builder.distinct();
+      }
+      builder.as("dt");
+      final List<RexNode> conditions = new ArrayList<>();
+      for (Pair<RexNode, RexNode> pair
+          : Pair.zip(e.getOperands(), builder.fields())) {
+        conditions.add(
+            builder.equals(pair.left, RexUtil.shift(pair.right, offset)));
+      }
+      switch (logic) {
+      case TRUE:
+        builder.join(JoinRelType.INNER, builder.and(conditions), variablesSet);
+        return builder.literal(true);
+      }
+      builder.join(JoinRelType.LEFT, builder.and(conditions), variablesSet);
+
+      final List<RexNode> keyIsNulls = new ArrayList<>();
+      for (RexNode operand : e.getOperands()) {
+        if (operand.getType().isNullable()) {
+          keyIsNulls.add(builder.isNull(operand));
+        }
+      }
+      final ImmutableList.Builder<RexNode> operands = ImmutableList.builder();
+      switch (logic) {
+      case TRUE_FALSE_UNKNOWN:
+      case UNKNOWN_AS_TRUE:
+        operands.add(
+            builder.equals(builder.field("ct", "c"), builder.literal(0)),
+            builder.literal(false));
+        break;
+      }
+      operands.add(builder.isNotNull(builder.field("dt", "i")),
+          builder.literal(true));
+      if (!keyIsNulls.isEmpty()) {
+        operands.add(builder.or(keyIsNulls), builder.literal(null));
+      }
+      Boolean b = true;
+      switch (logic) {
+      case TRUE_FALSE_UNKNOWN:
+        b = null;
+        // fall through
+      case UNKNOWN_AS_TRUE:
+        operands.add(
+            builder.call(SqlStdOperatorTable.LESS_THAN,
+                builder.field("ct", "ck"), builder.field("ct", "c")),
+            builder.literal(b));
+        break;
+      }
+      operands.add(builder.literal(false));
+      return builder.call(SqlStdOperatorTable.CASE, operands.build());
+
+    default:
+      throw new AssertionError(e.getKind());
+    }
+  }
+
+  /** Returns a reference to a particular field, by offset, across several
+   * inputs on a {@link RelBuilder}'s stack. */
+  private RexInputRef field(RelBuilder builder, int inputCount, int offset) {
+    for (int inputOrdinal = 0;;) {
+      final RelNode r = builder.peek(inputCount, inputOrdinal);
+      if (offset < r.getRowType().getFieldCount()) {
+        return builder.field(inputCount, inputOrdinal, offset);
+      }
+      ++inputOrdinal;
+      offset -= r.getRowType().getFieldCount();
+    }
+  }
+
+  /** Returns a list of expressions that project the first {@code fieldCount}
+   * fields of the top input on a {@link RelBuilder}'s stack. */
+  private static List<RexNode> fields(RelBuilder builder, int fieldCount) {
+    final List<RexNode> projects = new ArrayList<>();
+    for (int i = 0; i < fieldCount; i++) {
+      projects.add(builder.field(i));
+    }
+    return projects;
+  }
+
+  /** Shuttle that replaces occurrences of a given
+   * {@link org.apache.calcite.rex.RexSubQuery} with a replacement
+   * expression. */
+  private static class ReplaceSubQueryShuttle extends RexShuttle {
+    private final RexSubQuery subQuery;
+    private final RexNode replacement;
+
+    public ReplaceSubQueryShuttle(RexSubQuery subQuery, RexNode replacement) {
+      this.subQuery = subQuery;
+      this.replacement = replacement;
+    }
+
+    @Override public RexNode visitSubQuery(RexSubQuery subQuery) {
+      return RexUtil.eq(subQuery, this.subQuery) ? replacement : subQuery;
+    }
+  }
+}
+
+// End SubQueryRemoveRule.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java b/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
index 48e7bbf..acb4eb6 100644
--- a/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
+++ b/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
@@ -251,13 +251,15 @@ public class StreamRules {
       final RelNode right = join.getRight();
 
       final LogicalDelta rightWithDelta = LogicalDelta.create(right);
-      final LogicalJoin joinL = LogicalJoin.create(left, rightWithDelta, join.getCondition(),
-          join.getJoinType(), join.getVariablesStopped(), join.isSemiJoinDone(),
+      final LogicalJoin joinL = LogicalJoin.create(left, rightWithDelta,
+          join.getCondition(), join.getVariablesSet(), join.getJoinType(),
+          join.isSemiJoinDone(),
           ImmutableList.copyOf(join.getSystemFieldList()));
 
       final LogicalDelta leftWithDelta = LogicalDelta.create(left);
-      final LogicalJoin joinR = LogicalJoin.create(leftWithDelta, right, join.getCondition(),
-          join.getJoinType(), join.getVariablesStopped(), join.isSemiJoinDone(),
+      final LogicalJoin joinR = LogicalJoin.create(leftWithDelta, right,
+          join.getCondition(), join.getVariablesSet(), join.getJoinType(),
+          join.isSemiJoinDone(),
           ImmutableList.copyOf(join.getSystemFieldList()));
 
       List<RelNode> inputsToUnion = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/rex/LogicVisitor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rex/LogicVisitor.java b/core/src/main/java/org/apache/calcite/rex/LogicVisitor.java
new file mode 100644
index 0000000..610a004
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rex/LogicVisitor.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rex;
+
+import org.apache.calcite.plan.RelOptUtil.Logic;
+
+import com.google.common.collect.Iterables;
+
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Visitor pattern for traversing a tree of {@link RexNode} objects.
+ */
+public class LogicVisitor implements RexBiVisitor<Logic, Logic> {
+  private final RexNode seek;
+  private final Collection<Logic> logicCollection;
+
+  /** Creates a LogicVisitor. */
+  private LogicVisitor(RexNode seek, Collection<Logic> logicCollection) {
+    this.seek = seek;
+    this.logicCollection = logicCollection;
+  }
+
+  /** Finds a suitable logic for evaluating {@code seek} within a list of
+   * expressions.
+   *
+   * <p>Chooses a logic that is safe (that is, gives the right
+   * answer) with the fewest possibilities (that is, we prefer one that
+   * returns [true as true, false as false, unknown as false] over one that
+   * distinguishes false from unknown).
+   */
+  public static Logic find(Logic logic, List<RexNode> nodes,
+      RexNode seek) {
+    final Set<Logic> set = EnumSet.noneOf(Logic.class);
+    final LogicVisitor visitor = new LogicVisitor(seek, set);
+    for (RexNode node : nodes) {
+      node.accept(visitor, logic);
+    }
+    switch (set.size()) {
+    case 0:
+      throw new IllegalArgumentException("not found: " + seek);
+    case 1:
+      return Iterables.getOnlyElement(set);
+    default:
+      return Logic.TRUE_FALSE_UNKNOWN;
+    }
+  }
+
+  public static void collect(RexNode node, RexNode seek, Logic logic,
+      Collection<Logic> logicCollection) {
+    node.accept(new LogicVisitor(seek, logicCollection), logic);
+  }
+
+  public Logic visitCall(RexCall call, Logic logic) {
+    final Logic arg0 = logic;
+    switch (call.getKind()) {
+    case IS_NOT_NULL:
+    case IS_NULL:
+      logic = Logic.TRUE_FALSE_UNKNOWN;
+      break;
+    case IS_TRUE:
+    case IS_NOT_TRUE:
+      logic = Logic.UNKNOWN_AS_FALSE;
+      break;
+    case IS_FALSE:
+    case IS_NOT_FALSE:
+      logic = Logic.UNKNOWN_AS_TRUE;
+      break;
+    case NOT:
+      logic = logic.negate();
+      break;
+    case CASE:
+      logic = Logic.TRUE_FALSE_UNKNOWN;
+      break;
+    }
+    switch (logic) {
+    case TRUE:
+      switch (call.getKind()) {
+      case AND:
+        break;
+      default:
+        logic = Logic.TRUE_FALSE;
+      }
+    }
+    for (RexNode operand : call.operands) {
+      operand.accept(this, logic);
+    }
+    return end(call, arg0);
+  }
+
+  private Logic end(RexNode node, Logic arg) {
+    if (RexUtil.eq(node, seek)) {
+      logicCollection.add(arg);
+    }
+    return arg;
+  }
+
+  public Logic visitInputRef(RexInputRef inputRef, Logic arg) {
+    return end(inputRef, arg);
+  }
+
+  public Logic visitLocalRef(RexLocalRef localRef, Logic arg) {
+    return end(localRef, arg);
+  }
+
+  public Logic visitLiteral(RexLiteral literal, Logic arg) {
+    return end(literal, arg);
+  }
+
+  public Logic visitOver(RexOver over, Logic arg) {
+    return end(over, arg);
+  }
+
+  public Logic visitCorrelVariable(RexCorrelVariable correlVariable,
+      Logic arg) {
+    return end(correlVariable, arg);
+  }
+
+  public Logic visitDynamicParam(RexDynamicParam dynamicParam, Logic arg) {
+    return end(dynamicParam, arg);
+  }
+
+  public Logic visitRangeRef(RexRangeRef rangeRef, Logic arg) {
+    return end(rangeRef, arg);
+  }
+
+  public Logic visitFieldAccess(RexFieldAccess fieldAccess, Logic arg) {
+    return end(fieldAccess, arg);
+  }
+
+  public Logic visitSubQuery(RexSubQuery subQuery, Logic arg) {
+    if (!subQuery.getType().isNullable()) {
+      if (arg == Logic.TRUE_FALSE_UNKNOWN) {
+        arg = Logic.TRUE_FALSE;
+      }
+    }
+    return end(subQuery, arg);
+  }
+}
+
+// End LogicVisitor.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/rex/RexBiVisitor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rex/RexBiVisitor.java b/core/src/main/java/org/apache/calcite/rex/RexBiVisitor.java
new file mode 100644
index 0000000..8f7e0d3
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rex/RexBiVisitor.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rex;
+
+/**
+ * Visitor pattern for traversing a tree of {@link RexNode} objects
+ * and passing a payload to each.
+ *
+ * @see RexVisitor
+ *
+ * @param <R> Return type
+ * @param <P> Payload type
+ */
+public interface RexBiVisitor<R, P> {
+  //~ Methods ----------------------------------------------------------------
+
+  R visitInputRef(RexInputRef inputRef, P arg);
+
+  R visitLocalRef(RexLocalRef localRef, P arg);
+
+  R visitLiteral(RexLiteral literal, P arg);
+
+  R visitCall(RexCall call, P arg);
+
+  R visitOver(RexOver over, P arg);
+
+  R visitCorrelVariable(RexCorrelVariable correlVariable, P arg);
+
+  R visitDynamicParam(RexDynamicParam dynamicParam, P arg);
+
+  R visitRangeRef(RexRangeRef rangeRef, P arg);
+
+  R visitFieldAccess(RexFieldAccess fieldAccess, P arg);
+
+  R visitSubQuery(RexSubQuery subQuery, P arg);
+}
+
+// End RexBiVisitor.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/rex/RexBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rex/RexBuilder.java b/core/src/main/java/org/apache/calcite/rex/RexBuilder.java
index b8c64e3..dd147a5 100644
--- a/core/src/main/java/org/apache/calcite/rex/RexBuilder.java
+++ b/core/src/main/java/org/apache/calcite/rex/RexBuilder.java
@@ -23,6 +23,7 @@ import org.apache.calcite.avatica.util.TimeUnit;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
@@ -431,14 +432,14 @@ public class RexBuilder {
   /**
    * Creates an expression referencing a correlation variable.
    *
+   * @param id Name of variable
    * @param type Type of variable
-   * @param name Name of variable
    * @return Correlation variable
    */
   public RexNode makeCorrel(
       RelDataType type,
-      String name) {
-    return new RexCorrelVariable(name, type);
+      CorrelationId id) {
+    return new RexCorrelVariable(id, type);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/rex/RexCall.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rex/RexCall.java b/core/src/main/java/org/apache/calcite/rex/RexCall.java
index 7cf8255..c843651 100644
--- a/core/src/main/java/org/apache/calcite/rex/RexCall.java
+++ b/core/src/main/java/org/apache/calcite/rex/RexCall.java
@@ -45,9 +45,9 @@ import java.util.List;
 public class RexCall extends RexNode {
   //~ Instance fields --------------------------------------------------------
 
-  private final SqlOperator op;
+  public final SqlOperator op;
   public final ImmutableList<RexNode> operands;
-  private final RelDataType type;
+  public final RelDataType type;
 
   //~ Constructors -----------------------------------------------------------
 
@@ -108,6 +108,10 @@ public class RexCall extends RexNode {
     return visitor.visitCall(this);
   }
 
+  public <R, P> R accept(RexBiVisitor<R, P> visitor, P arg) {
+    return visitor.visitCall(this, arg);
+  }
+
   public RelDataType getType() {
     return type;
   }

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/rex/RexCorrelVariable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rex/RexCorrelVariable.java b/core/src/main/java/org/apache/calcite/rex/RexCorrelVariable.java
index ea1dade..4880c8e 100644
--- a/core/src/main/java/org/apache/calcite/rex/RexCorrelVariable.java
+++ b/core/src/main/java/org/apache/calcite/rex/RexCorrelVariable.java
@@ -16,6 +16,7 @@
  */
 package org.apache.calcite.rex;
 
+import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlKind;
 
@@ -27,12 +28,15 @@ import org.apache.calcite.sql.SqlKind;
  * assigned a value, and the other side of the join is restarted.</p>
  */
 public class RexCorrelVariable extends RexVariable {
+  public final CorrelationId id;
+
   //~ Constructors -----------------------------------------------------------
 
   RexCorrelVariable(
-      String varName,
+      CorrelationId id,
       RelDataType type) {
-    super(varName, type);
+    super(id.getName(), type);
+    this.id = id;
   }
 
   //~ Methods ----------------------------------------------------------------
@@ -41,6 +45,10 @@ public class RexCorrelVariable extends RexVariable {
     return visitor.visitCorrelVariable(this);
   }
 
+  public <R, P> R accept(RexBiVisitor<R, P> visitor, P arg) {
+    return visitor.visitCorrelVariable(this, arg);
+  }
+
   @Override public SqlKind getKind() {
     return SqlKind.CORREL_VARIABLE;
   }

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/rex/RexDynamicParam.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rex/RexDynamicParam.java b/core/src/main/java/org/apache/calcite/rex/RexDynamicParam.java
index cd51cc9..6bbd692 100644
--- a/core/src/main/java/org/apache/calcite/rex/RexDynamicParam.java
+++ b/core/src/main/java/org/apache/calcite/rex/RexDynamicParam.java
@@ -55,6 +55,10 @@ public class RexDynamicParam extends RexVariable {
   public <R> R accept(RexVisitor<R> visitor) {
     return visitor.visitDynamicParam(this);
   }
+
+  public <R, P> R accept(RexBiVisitor<R, P> visitor, P arg) {
+    return visitor.visitDynamicParam(this, arg);
+  }
 }
 
 // End RexDynamicParam.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/rex/RexFieldAccess.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rex/RexFieldAccess.java b/core/src/main/java/org/apache/calcite/rex/RexFieldAccess.java
index 09b9058..658ffc5 100644
--- a/core/src/main/java/org/apache/calcite/rex/RexFieldAccess.java
+++ b/core/src/main/java/org/apache/calcite/rex/RexFieldAccess.java
@@ -81,6 +81,10 @@ public class RexFieldAccess extends RexNode {
     return visitor.visitFieldAccess(this);
   }
 
+  public <R, P> R accept(RexBiVisitor<R, P> visitor, P arg) {
+    return visitor.visitFieldAccess(this, arg);
+  }
+
   /**
    * Returns the expression whose field is being accessed.
    */

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/rex/RexInputRef.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rex/RexInputRef.java b/core/src/main/java/org/apache/calcite/rex/RexInputRef.java
index 0c475db..8019ad6 100644
--- a/core/src/main/java/org/apache/calcite/rex/RexInputRef.java
+++ b/core/src/main/java/org/apache/calcite/rex/RexInputRef.java
@@ -112,6 +112,10 @@ public class RexInputRef extends RexSlot {
     return visitor.visitInputRef(this);
   }
 
+  public <R, P> R accept(RexBiVisitor<R, P> visitor, P arg) {
+    return visitor.visitInputRef(this, arg);
+  }
+
   /**
    * Creates a name for an input reference, of the form "$index". If the index
    * is low, uses a cache of common names, to reduce gc.

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/rex/RexLiteral.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rex/RexLiteral.java b/core/src/main/java/org/apache/calcite/rex/RexLiteral.java
index bd07072..acdebec 100644
--- a/core/src/main/java/org/apache/calcite/rex/RexLiteral.java
+++ b/core/src/main/java/org/apache/calcite/rex/RexLiteral.java
@@ -279,7 +279,7 @@ public class RexLiteral extends RexNode {
       }
       return litmus.succeed();
     } else if (o instanceof Map) {
-      final Map<Object, Object> map = (Map) o;
+      @SuppressWarnings("unchecked") final Map<Object, Object> map = (Map) o;
       for (Map.Entry entry : map.entrySet()) {
         if (!validConstant(entry.getKey(), litmus)) {
           return litmus.fail("not a constant: " + entry.getKey());
@@ -649,15 +649,17 @@ public class RexLiteral extends RexNode {
         && (((RexLiteral) node).value == null);
   }
 
-  private static boolean equals(
-      Object o1,
-      Object o2) {
+  private static boolean equals(Object o1, Object o2) {
     return (o1 == null) ? (o2 == null) : o1.equals(o2);
   }
 
   public <R> R accept(RexVisitor<R> visitor) {
     return visitor.visitLiteral(this);
   }
+
+  public <R, P> R accept(RexBiVisitor<R, P> visitor, P arg) {
+    return visitor.visitLiteral(this, arg);
+  }
 }
 
 // End RexLiteral.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/rex/RexLocalRef.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rex/RexLocalRef.java b/core/src/main/java/org/apache/calcite/rex/RexLocalRef.java
index 9d4d1cc..57a3aff 100644
--- a/core/src/main/java/org/apache/calcite/rex/RexLocalRef.java
+++ b/core/src/main/java/org/apache/calcite/rex/RexLocalRef.java
@@ -77,6 +77,10 @@ public class RexLocalRef extends RexSlot {
     return visitor.visitLocalRef(this);
   }
 
+  public <R, P> R accept(RexBiVisitor<R, P> visitor, P arg) {
+    return visitor.visitLocalRef(this, arg);
+  }
+
   private static String createName(int index) {
     return NAMES.get(index);
   }

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/rex/RexNode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rex/RexNode.java b/core/src/main/java/org/apache/calcite/rex/RexNode.java
index 0922116..95df460 100644
--- a/core/src/main/java/org/apache/calcite/rex/RexNode.java
+++ b/core/src/main/java/org/apache/calcite/rex/RexNode.java
@@ -90,6 +90,12 @@ public abstract class RexNode {
    * which applies a visitor to several expressions simultaneously.
    */
   public abstract <R> R accept(RexVisitor<R> visitor);
+
+  /**
+   * Accepts a visitor with a payload, dispatching to the right overloaded
+   * {@link RexBiVisitor#visitInputRef(RexInputRef, Object)} visitXxx} method.
+   */
+  public abstract <R, P> R accept(RexBiVisitor<R, P> visitor, P arg);
 }
 
 // End RexNode.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/rex/RexOver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rex/RexOver.java b/core/src/main/java/org/apache/calcite/rex/RexOver.java
index 3b45f36..5c2c881 100644
--- a/core/src/main/java/org/apache/calcite/rex/RexOver.java
+++ b/core/src/main/java/org/apache/calcite/rex/RexOver.java
@@ -89,6 +89,10 @@ public class RexOver extends RexCall {
     return visitor.visitOver(this);
   }
 
+  public <R, P> R accept(RexBiVisitor<R, P> visitor, P arg) {
+    return visitor.visitOver(this, arg);
+  }
+
   /**
    * Returns whether an expression contains an OVER clause.
    */

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/rex/RexProgram.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rex/RexProgram.java b/core/src/main/java/org/apache/calcite/rex/RexProgram.java
index 664c092..30382c5 100644
--- a/core/src/main/java/org/apache/calcite/rex/RexProgram.java
+++ b/core/src/main/java/org/apache/calcite/rex/RexProgram.java
@@ -859,6 +859,10 @@ public class RexProgram {
       return expr.accept(this);
     }
 
+    @Override public Boolean visitOver(RexOver over) {
+      return false;
+    }
+
     @Override public Boolean visitCorrelVariable(RexCorrelVariable correlVariable) {
       // Correlating variables are constant WITHIN A RESTART, so that's
       // good enough.

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/rex/RexRangeRef.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rex/RexRangeRef.java b/core/src/main/java/org/apache/calcite/rex/RexRangeRef.java
index bf2104c..04aa277 100644
--- a/core/src/main/java/org/apache/calcite/rex/RexRangeRef.java
+++ b/core/src/main/java/org/apache/calcite/rex/RexRangeRef.java
@@ -70,6 +70,10 @@ public class RexRangeRef extends RexNode {
   public <R> R accept(RexVisitor<R> visitor) {
     return visitor.visitRangeRef(this);
   }
+
+  public <R, P> R accept(RexBiVisitor<R, P> visitor, P arg) {
+    return visitor.visitRangeRef(this, arg);
+  }
 }
 
 // End RexRangeRef.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/rex/RexShuttle.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rex/RexShuttle.java b/core/src/main/java/org/apache/calcite/rex/RexShuttle.java
index e3c0302..f0e5820 100644
--- a/core/src/main/java/org/apache/calcite/rex/RexShuttle.java
+++ b/core/src/main/java/org/apache/calcite/rex/RexShuttle.java
@@ -78,6 +78,16 @@ public class RexShuttle implements RexVisitor<RexNode> {
     }
   }
 
+  public RexNode visitSubQuery(RexSubQuery subQuery) {
+    boolean[] update = {false};
+    List<RexNode> clonedOperands = visitList(subQuery.operands, update);
+    if (update[0]) {
+      return subQuery.clone(subQuery.getType(), clonedOperands);
+    } else {
+      return subQuery;
+    }
+  }
+
   public RexNode visitCall(final RexCall call) {
     boolean[] update = {false};
     List<RexNode> clonedOperands = visitList(call.operands, update);
@@ -238,7 +248,7 @@ public class RexShuttle implements RexVisitor<RexNode> {
     if (exprList == null) {
       return null;
     }
-    final List<T> list2 = new ArrayList<T>(exprList);
+    final List<T> list2 = new ArrayList<>(exprList);
     if (mutate(list2)) {
       return list2;
     } else {

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/rex/RexSubQuery.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rex/RexSubQuery.java b/core/src/main/java/org/apache/calcite/rex/RexSubQuery.java
new file mode 100644
index 0000000..d8fff4e
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/rex/RexSubQuery.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rex;
+
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+/**
+ * Scalar expression that represents an IN, EXISTS or scalar sub-query.
+ */
+public class RexSubQuery extends RexCall {
+  public final RelNode rel;
+
+  private RexSubQuery(RelDataType type, SqlOperator op,
+      ImmutableList<RexNode> operands, RelNode rel) {
+    super(type, op, operands);
+    this.rel = rel;
+  }
+
+  /** Creates an IN sub-query. */
+  public static RexSubQuery in(RelNode rel, ImmutableList<RexNode> nodes) {
+    assert rel.getRowType().getFieldCount() == nodes.size();
+    final RelDataTypeFactory typeFactory = rel.getCluster().getTypeFactory();
+    boolean nullable = false;
+    for (RexNode node : nodes) {
+      if (node.getType().isNullable()) {
+        nullable = true;
+      }
+    }
+    for (RelDataTypeField field : rel.getRowType().getFieldList()) {
+      if (field.getType().isNullable()) {
+        nullable = true;
+      }
+    }
+    final RelDataType type =
+        typeFactory.createTypeWithNullability(
+            typeFactory.createSqlType(SqlTypeName.BOOLEAN), nullable);
+    return new RexSubQuery(type, SqlStdOperatorTable.IN, nodes, rel);
+  }
+
+  /** Creates an EXISTS sub-query. */
+  public static RexSubQuery exists(RelNode rel) {
+    final RelDataTypeFactory typeFactory = rel.getCluster().getTypeFactory();
+    final RelDataType type = typeFactory.createSqlType(SqlTypeName.BOOLEAN);
+    return new RexSubQuery(type, SqlStdOperatorTable.EXISTS,
+        ImmutableList.<RexNode>of(), rel);
+  }
+
+  /** Creates a scalar sub-query. */
+  public static RexSubQuery scalar(RelNode rel) {
+    final List<RelDataTypeField> fieldList = rel.getRowType().getFieldList();
+    assert fieldList.size() == 1;
+    final RelDataTypeFactory typeFactory = rel.getCluster().getTypeFactory();
+    final RelDataType type =
+        typeFactory.createTypeWithNullability(fieldList.get(0).getType(), true);
+    return new RexSubQuery(type, SqlStdOperatorTable.SCALAR_QUERY,
+        ImmutableList.<RexNode>of(), rel);
+  }
+
+  public <R> R accept(RexVisitor<R> visitor) {
+    return visitor.visitSubQuery(this);
+  }
+
+  public <R, P> R accept(RexBiVisitor<R, P> visitor, P arg) {
+    return visitor.visitSubQuery(this, arg);
+  }
+
+  @Override protected String computeDigest(boolean withType) {
+    StringBuilder sb = new StringBuilder(op.getName());
+    sb.append("(");
+    for (RexNode operand : operands) {
+      sb.append(operand.toString());
+      sb.append(", ");
+    }
+    sb.append("{\n");
+    sb.append(RelOptUtil.toString(rel));
+    sb.append("})");
+    return sb.toString();
+  }
+
+  @Override public RexSubQuery clone(RelDataType type, List<RexNode> operands) {
+    return new RexSubQuery(type, getOperator(),
+        ImmutableList.copyOf(operands), rel);
+  }
+
+  public RexSubQuery clone(RelNode rel) {
+    return new RexSubQuery(type, getOperator(), operands, rel);
+  }
+}
+
+// End RexSubQuery.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/rex/RexUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rex/RexUtil.java b/core/src/main/java/org/apache/calcite/rex/RexUtil.java
index b2de640..b766590 100644
--- a/core/src/main/java/org/apache/calcite/rex/RexUtil.java
+++ b/core/src/main/java/org/apache/calcite/rex/RexUtil.java
@@ -22,6 +22,9 @@ import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeFamily;
@@ -260,6 +263,10 @@ public class RexUtil {
       return false;
     }
 
+    public Boolean visitSubQuery(RexSubQuery subQuery) {
+      return false;
+    }
+
     public Boolean visitCorrelVariable(RexCorrelVariable correlVariable) {
       // Correlating variables change when there is an internal restart.
       // Not good enough for our purposes.
@@ -1248,7 +1255,7 @@ public class RexUtil {
     Iterator<RexNode> iterator = targets.iterator();
     while (iterator.hasNext()) {
       RexNode next = iterator.next();
-      if (equivalent(next, e)) {
+      if (eq(next, e)) {
         ++count;
         iterator.remove();
       }
@@ -1256,11 +1263,12 @@ public class RexUtil {
     return count > 0;
   }
 
-  /** Returns whether two expressions are equivalent. */
-  private static boolean equivalent(RexNode e1, RexNode e2) {
-    // TODO: make broader;
-    // 1. 'x = y' should be equivalent to 'y = x'.
-    // 2. 'c2 and c1' should be equivalent to 'c1 and c2'.
+  /** Returns whether two {@link RexNode}s are structurally equal.
+   *
+   * <p>This method considers structure, not semantics. 'x &lt; y' is not
+   * equivalent to 'y &gt; x'.
+   */
+  public static boolean eq(RexNode e1, RexNode e2) {
     return e1 == e2 || e1.toString().equals(e2.toString());
   }
 
@@ -1341,8 +1349,12 @@ public class RexUtil {
     }
     switch (a.getKind()) {
     case NOT:
-      // NOT x IS TRUE ==> x IS NOT TRUE
+      // (NOT x) IS TRUE ==> x IS FALSE
       // Similarly for IS NOT TRUE, IS FALSE, etc.
+      //
+      // Note that
+      //   (NOT x) IS TRUE !=> x IS FALSE
+      // because of null values.
       return simplify(rexBuilder,
           rexBuilder.makeCall(op(kind.negate()),
               ((RexCall) a).getOperands().get(0)));
@@ -1990,7 +2002,7 @@ public class RexUtil {
   }
 
   /** Visitor that throws {@link org.apache.calcite.util.Util.FoundOne} if
-   * there an expression contains a {@link RexCorrelVariable}. */
+   * applied to an expression that contains a {@link RexCorrelVariable}. */
   private static class CorrelationFinder extends RexVisitorImpl<Void> {
     static final CorrelationFinder INSTANCE = new CorrelationFinder();
 
@@ -2030,6 +2042,81 @@ public class RexUtil {
       throw new AssertionError("mismatched type " + ref + " " + rightType);
     }
   }
+
+  /** Visitor that throws {@link org.apache.calcite.util.Util.FoundOne} if
+   * applied to an expression that contains a {@link RexSubQuery}. */
+  public static class SubQueryFinder extends RexVisitorImpl<Void> {
+    public static final SubQueryFinder INSTANCE = new SubQueryFinder();
+
+    /** Returns whether a {@link Project} contains a sub-query. */
+    public static final Predicate<Project> PROJECT_PREDICATE =
+        new Predicate<Project>() {
+          public boolean apply(Project project) {
+            for (RexNode node : project.getProjects()) {
+              try {
+                node.accept(INSTANCE);
+              } catch (Util.FoundOne e) {
+                return true;
+              }
+            }
+            return false;
+          }
+        };
+
+    /** Returns whether a {@link Filter} contains a sub-query. */
+    public static final Predicate<Filter> FILTER_PREDICATE =
+        new Predicate<Filter>() {
+          public boolean apply(Filter filter) {
+            try {
+              filter.getCondition().accept(INSTANCE);
+              return false;
+            } catch (Util.FoundOne e) {
+              return true;
+            }
+          }
+        };
+
+    /** Returns whether a {@link Join} contains a sub-query. */
+    public static final Predicate<Join> JOIN_PREDICATE =
+        new Predicate<Join>() {
+          public boolean apply(Join join) {
+            try {
+              join.getCondition().accept(INSTANCE);
+              return false;
+            } catch (Util.FoundOne e) {
+              return true;
+            }
+          }
+        };
+
+    private SubQueryFinder() {
+      super(true);
+    }
+
+    @Override public Void visitSubQuery(RexSubQuery subQuery) {
+      throw new Util.FoundOne(subQuery);
+    }
+
+    public static RexSubQuery find(Iterable<RexNode> nodes) {
+      for (RexNode node : nodes) {
+        try {
+          node.accept(INSTANCE);
+        } catch (Util.FoundOne e) {
+          return (RexSubQuery) e.getNode();
+        }
+      }
+      return null;
+    }
+
+    public static RexSubQuery find(RexNode node) {
+      try {
+        node.accept(INSTANCE);
+        return null;
+      } catch (Util.FoundOne e) {
+        return (RexSubQuery) e.getNode();
+      }
+    }
+  }
 }
 
 // End RexUtil.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/rex/RexVisitor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rex/RexVisitor.java b/core/src/main/java/org/apache/calcite/rex/RexVisitor.java
index fd3d6b6..4c95647 100644
--- a/core/src/main/java/org/apache/calcite/rex/RexVisitor.java
+++ b/core/src/main/java/org/apache/calcite/rex/RexVisitor.java
@@ -45,6 +45,8 @@ public interface RexVisitor<R> {
   R visitRangeRef(RexRangeRef rangeRef);
 
   R visitFieldAccess(RexFieldAccess fieldAccess);
+
+  R visitSubQuery(RexSubQuery subQuery);
 }
 
 // End RexVisitor.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/rex/RexVisitorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/rex/RexVisitorImpl.java b/core/src/main/java/org/apache/calcite/rex/RexVisitorImpl.java
index fcc6ff7..ec31f26 100644
--- a/core/src/main/java/org/apache/calcite/rex/RexVisitorImpl.java
+++ b/core/src/main/java/org/apache/calcite/rex/RexVisitorImpl.java
@@ -98,6 +98,18 @@ public class RexVisitorImpl<R> implements RexVisitor<R> {
     return expr.accept(this);
   }
 
+  public R visitSubQuery(RexSubQuery subQuery) {
+    if (!deep) {
+      return null;
+    }
+
+    R r = null;
+    for (RexNode operand : subQuery.operands) {
+      r = operand.accept(this);
+    }
+    return r;
+  }
+
   /**
    * <p>Visits an array of expressions, returning the logical 'and' of their
    * results.

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/schema/SchemaPlus.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/schema/SchemaPlus.java b/core/src/main/java/org/apache/calcite/schema/SchemaPlus.java
index 8d5f380..ee5c02c 100644
--- a/core/src/main/java/org/apache/calcite/schema/SchemaPlus.java
+++ b/core/src/main/java/org/apache/calcite/schema/SchemaPlus.java
@@ -26,7 +26,7 @@ import com.google.common.collect.ImmutableList;
  * <p>Given a user-defined schema that implements the {@link Schema} interface,
  * Calcite creates a wrapper that implements the {@code SchemaPlus} interface.
  * This provides extra functionality, such as access to tables that have been
- * added explicitly.</p>
+ * added explicitly.
  *
  * <p>A user-defined schema does not need to implement this interface, but by
  * the time a schema is passed to a method in a user-defined schema or

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/sql/SqlKind.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlKind.java b/core/src/main/java/org/apache/calcite/sql/SqlKind.java
index 37a757d..270a311 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlKind.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlKind.java
@@ -913,15 +913,15 @@ public enum SqlKind {
   public SqlKind negate() {
     switch (this) {
     case IS_TRUE:
-      return IS_NOT_TRUE;
+      return IS_FALSE;
     case IS_FALSE:
-      return IS_NOT_FALSE;
+      return IS_TRUE;
     case IS_NULL:
       return IS_NOT_NULL;
     case IS_NOT_TRUE:
-      return IS_TRUE;
+      return IS_NOT_FALSE;
     case IS_NOT_FALSE:
-      return IS_FALSE;
+      return IS_NOT_TRUE;
     case IS_NOT_NULL:
       return IS_NULL;
     case IS_DISTINCT_FROM:

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
index 8b92b91..00b8611 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
@@ -1927,6 +1927,7 @@ public class SqlValidatorImpl implements SqlValidatorWithHints {
       if (newRight != right) {
         join.setRight(newRight);
       }
+      registerSubqueries(joinScope, join.getCondition());
       final JoinNamespace joinNamespace = new JoinNamespace(this, join);
       registerNamespace(null, null, joinNamespace, forceNullable);
       return join;

http://git-wip-us.apache.org/repos/asf/calcite/blob/505a9064/core/src/main/java/org/apache/calcite/sql2rel/DeduplicateCorrelateVariables.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/sql2rel/DeduplicateCorrelateVariables.java b/core/src/main/java/org/apache/calcite/sql2rel/DeduplicateCorrelateVariables.java
index a24924c..43661ef 100644
--- a/core/src/main/java/org/apache/calcite/sql2rel/DeduplicateCorrelateVariables.java
+++ b/core/src/main/java/org/apache/calcite/sql2rel/DeduplicateCorrelateVariables.java
@@ -18,12 +18,14 @@ package org.apache.calcite.sql2rel;
 
 import org.apache.calcite.rel.RelHomogeneousShuttle;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexCorrelVariable;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexSubQuery;
 
-import java.util.Set;
+import com.google.common.collect.ImmutableSet;
 
 /**
  * Rewrites relations to ensure the same correlation is referenced by the same
@@ -32,39 +34,64 @@ import java.util.Set;
 public class DeduplicateCorrelateVariables extends RelHomogeneousShuttle {
   private final RexShuttle dedupRex;
 
+  /** Creates a DeduplicateCorrelateVariables. */
+  private DeduplicateCorrelateVariables(RexBuilder builder,
+      CorrelationId canonicalId, ImmutableSet<CorrelationId> alternateIds) {
+    dedupRex = new DeduplicateCorrelateVariablesShuttle(builder,
+        canonicalId, alternateIds, this);
+  }
+
+  /**
+   * Rewrites a relational expression, replacing alternate correlation variables
+   * with a canonical correlation variable.
+   */
+  public static RelNode go(RexBuilder builder, CorrelationId canonicalId,
+      Iterable<? extends CorrelationId> alternateIds, RelNode r) {
+    return r.accept(
+        new DeduplicateCorrelateVariables(builder, canonicalId,
+            ImmutableSet.copyOf(alternateIds)));
+  }
+
+  @Override public RelNode visit(RelNode other) {
+    RelNode next = super.visit(other);
+    return next.accept(dedupRex);
+  }
+
   /**
    * Replaces alternative names of correlation variable to its canonical name.
    */
   private static class DeduplicateCorrelateVariablesShuttle extends RexShuttle {
     private final RexBuilder builder;
-    private final String canonical;
-    private final Set<String> altNames;
+    private final CorrelationId canonicalId;
+    private final ImmutableSet<CorrelationId> alternateIds;
+    private final DeduplicateCorrelateVariables shuttle;
 
-    public DeduplicateCorrelateVariablesShuttle(RexBuilder builder,
-        String canonical, Set<String> altNames) {
+    private DeduplicateCorrelateVariablesShuttle(RexBuilder builder,
+        CorrelationId canonicalId, ImmutableSet<CorrelationId> alternateIds,
+        DeduplicateCorrelateVariables shuttle) {
       this.builder = builder;
-      this.canonical = canonical;
-      this.altNames = altNames;
+      this.canonicalId = canonicalId;
+      this.alternateIds = alternateIds;
+      this.shuttle = shuttle;
     }
 
     @Override public RexNode visitCorrelVariable(RexCorrelVariable variable) {
-      if (!altNames.contains(variable.getName())) {
+      if (!alternateIds.contains(variable.id)) {
         return variable;
       }
 
-      return builder.makeCorrel(variable.getType(), canonical);
+      return builder.makeCorrel(variable.getType(), canonicalId);
     }
-  }
-
-  public DeduplicateCorrelateVariables(RexBuilder builder,
-      String canonical, Set<String> altNames) {
-    dedupRex = new DeduplicateCorrelateVariablesShuttle(builder,
-        canonical, altNames);
-  }
 
-  @Override public RelNode visit(RelNode other) {
-    RelNode next = super.visit(other);
-    return next.accept(dedupRex);
+    @Override public RexNode visitSubQuery(RexSubQuery subQuery) {
+      if (shuttle != null) {
+        RelNode r = subQuery.rel.accept(shuttle); // look inside sub-queries
+        if (r != subQuery.rel) {
+          subQuery = subQuery.clone(r);
+        }
+      }
+      return super.visitSubQuery(subQuery);
+    }
   }
 }