You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by mm...@apache.org on 2019/01/07 17:14:58 UTC

[calcite] branch master updated: [CALCITE-2554] Enrich enumerable join operators with order preserving information (Stamatis Zampetakis)

This is an automated email from the ASF dual-hosted git repository.

mmior pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git


The following commit(s) were added to refs/heads/master by this push:
     new 416d8c9  [CALCITE-2554] Enrich enumerable join operators with order preserving information (Stamatis Zampetakis)
416d8c9 is described below

commit 416d8c91d6b3dbfad9673a5c9e8e7d97b5f15be0
Author: Stamatis Zampetakis <za...@gmail.com>
AuthorDate: Wed Sep 19 11:25:37 2018 +0200

    [CALCITE-2554] Enrich enumerable join operators with order preserving information (Stamatis Zampetakis)
    
    1. Introduce collation metadata information for EnumerableJoin, EnumerableThetaJoin, EnumerableSemiJoin, and EnumerableCorrelate.
    
    2. Modify static factory create methods to obtain the collation information from the metadata.
    
    3. Change EnumerableJoinRule, EnumerableCorrelateRule to use the static factory method to create the operators.
    
    4. Add SortRemoveRuleTest, exploiting the preserved collation of various enumerable join operatrs to remove a sort.
    
    5. Adapt sql/join.iq removing redundant sort operators.
    
    6. Adapt sql/sub-query.iq due to CALCITE-2582.
    
    7. Add tests verifying the order preserving properties of the join algorithms in EnumerableDefaults.
---
 .../adapter/enumerable/EnumerableCorrelate.java    |  26 ++
 .../enumerable/EnumerableCorrelateRule.java        |   7 +-
 .../calcite/adapter/enumerable/EnumerableJoin.java |   7 +-
 .../adapter/enumerable/EnumerableJoinRule.java     |  15 +-
 .../adapter/enumerable/EnumerableSemiJoin.java     |  18 +-
 .../adapter/enumerable/EnumerableThetaJoin.java    |  19 +
 .../calcite/rel/metadata/RelMdCollation.java       |  86 +++++
 .../calcite/rel/rules/SortRemoveRuleTest.java      | 214 +++++++++++
 .../apache/calcite/schemas/HrClusteredSchema.java  | 130 +++++++
 .../java/org/apache/calcite/test/CalciteSuite.java |   2 +
 .../java/org/apache/calcite/tools/PlannerTest.java |  42 ++
 core/src/test/resources/sql/join.iq                |   9 +-
 core/src/test/resources/sql/sub-query.iq           |   4 +-
 .../calcite/linq4j/test/JoinPreserveOrderTest.java | 426 +++++++++++++++++++++
 .../apache/calcite/linq4j/test/Linq4jSuite.java    |   3 +-
 15 files changed, 982 insertions(+), 26 deletions(-)

diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableCorrelate.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableCorrelate.java
index bdb583c..e0a2d4b 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableCorrelate.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableCorrelate.java
@@ -23,9 +23,12 @@ import org.apache.calcite.linq4j.tree.ParameterExpression;
 import org.apache.calcite.linq4j.tree.Primitive;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollationTraitDef;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Correlate;
 import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.metadata.RelMdCollation;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.sql.SemiJoinType;
 import org.apache.calcite.util.BuiltInMethod;
 import org.apache.calcite.util.ImmutableBitSet;
@@ -48,6 +51,29 @@ public class EnumerableCorrelate extends Correlate
         joinType);
   }
 
+  /** Creates an EnumerableCorrelate. */
+  public static EnumerableCorrelate create(
+      RelNode left,
+      RelNode right,
+      CorrelationId correlationId,
+      ImmutableBitSet requiredColumns,
+      SemiJoinType joinType) {
+    final RelOptCluster cluster = left.getCluster();
+    final RelMetadataQuery mq = cluster.getMetadataQuery();
+    final RelTraitSet traitSet =
+        cluster.traitSetOf(EnumerableConvention.INSTANCE)
+            .replaceIfs(RelCollationTraitDef.INSTANCE,
+                () -> RelMdCollation.enumerableCorrelate(mq, left, right, joinType));
+    return new EnumerableCorrelate(
+        cluster,
+        traitSet,
+        left,
+        right,
+        correlationId,
+        requiredColumns,
+        joinType);
+  }
+
   @Override public EnumerableCorrelate copy(RelTraitSet traitSet,
       RelNode left, RelNode right, CorrelationId correlationId,
       ImmutableBitSet requiredColumns, SemiJoinType joinType) {
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableCorrelateRule.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableCorrelateRule.java
index 2b543a2..fa3c557 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableCorrelateRule.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableCorrelateRule.java
@@ -17,7 +17,6 @@
 package org.apache.calcite.adapter.enumerable;
 
 import org.apache.calcite.plan.Convention;
-import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.convert.ConverterRule;
 import org.apache.calcite.rel.logical.LogicalCorrelate;
@@ -42,11 +41,7 @@ public class EnumerableCorrelateRule extends ConverterRule {
 
   public RelNode convert(RelNode rel) {
     final LogicalCorrelate c = (LogicalCorrelate) rel;
-    final RelTraitSet traitSet =
-        c.getTraitSet().replace(EnumerableConvention.INSTANCE);
-    return new EnumerableCorrelate(
-        rel.getCluster(),
-        traitSet,
+    return EnumerableCorrelate.create(
         convert(c.getLeft(), c.getLeft().getTraitSet()
             .replace(EnumerableConvention.INSTANCE)),
         convert(c.getRight(), c.getRight().getTraitSet()
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableJoin.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableJoin.java
index cfd00a4..768e568 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableJoin.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableJoin.java
@@ -24,12 +24,14 @@ import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelCollationTraitDef;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelNodes;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.EquiJoin;
 import org.apache.calcite.rel.core.JoinInfo;
 import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMdCollation;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.BuiltInMethod;
@@ -89,8 +91,11 @@ public class EnumerableJoin extends EquiJoin implements EnumerableRel {
       JoinRelType joinType)
       throws InvalidRelException {
     final RelOptCluster cluster = left.getCluster();
+    final RelMetadataQuery mq = cluster.getMetadataQuery();
     final RelTraitSet traitSet =
-        cluster.traitSetOf(EnumerableConvention.INSTANCE);
+        cluster.traitSetOf(EnumerableConvention.INSTANCE)
+            .replaceIfs(RelCollationTraitDef.INSTANCE,
+                () -> RelMdCollation.enumerableJoin(mq, left, right, joinType));
     return new EnumerableJoin(cluster, traitSet, left, right, condition,
         leftKeys, rightKeys, variablesSet, joinType);
   }
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableJoinRule.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableJoinRule.java
index 375568b..ee7f118 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableJoinRule.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableJoinRule.java
@@ -18,7 +18,6 @@ package org.apache.calcite.adapter.enumerable;
 
 import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.InvalidRelException;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.convert.ConverterRule;
@@ -55,8 +54,6 @@ class EnumerableJoinRule extends ConverterRule {
       newInputs.add(input);
     }
     final RelOptCluster cluster = join.getCluster();
-    final RelTraitSet traitSet =
-        join.getTraitSet().replace(EnumerableConvention.INSTANCE);
     final RelNode left = newInputs.get(0);
     final RelNode right = newInputs.get(1);
     final JoinInfo info = JoinInfo.of(left, right, join.getCondition());
@@ -64,8 +61,12 @@ class EnumerableJoinRule extends ConverterRule {
       // EnumerableJoinRel only supports equi-join. We can put a filter on top
       // if it is an inner join.
       try {
-        return new EnumerableThetaJoin(cluster, traitSet, left, right,
-            join.getCondition(), join.getVariablesSet(), join.getJoinType());
+        return EnumerableThetaJoin.create(
+            left,
+            right,
+            join.getCondition(),
+            join.getVariablesSet(),
+            join.getJoinType());
       } catch (InvalidRelException e) {
         EnumerableRules.LOGGER.debug(e.toString());
         return null;
@@ -73,9 +74,7 @@ class EnumerableJoinRule extends ConverterRule {
     }
     RelNode newRel;
     try {
-      newRel = new EnumerableJoin(
-          cluster,
-          join.getTraitSet().replace(EnumerableConvention.INSTANCE),
+      newRel = EnumerableJoin.create(
           left,
           right,
           info.getEquiCondition(left, right, cluster.getRexBuilder()),
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableSemiJoin.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableSemiJoin.java
index 0162c51..6b7254a 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableSemiJoin.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableSemiJoin.java
@@ -24,10 +24,12 @@ import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelCollationTraitDef;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.JoinInfo;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.SemiJoin;
+import org.apache.calcite.rel.metadata.RelMdCollation;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.util.BuiltInMethod;
@@ -56,10 +58,20 @@ public class EnumerableSemiJoin extends SemiJoin implements EnumerableRel {
   public static EnumerableSemiJoin create(RelNode left, RelNode right, RexNode condition,
       ImmutableIntList leftKeys, ImmutableIntList rightKeys) {
     final RelOptCluster cluster = left.getCluster();
+    final RelMetadataQuery mq = cluster.getMetadataQuery();
+    final RelTraitSet traitSet =
+        cluster.traitSetOf(EnumerableConvention.INSTANCE)
+            .replaceIfs(RelCollationTraitDef.INSTANCE,
+                () -> RelMdCollation.enumerableSemiJoin(mq, left, right));
     try {
-      return new EnumerableSemiJoin(cluster,
-          cluster.traitSetOf(EnumerableConvention.INSTANCE), left,
-          right, condition, leftKeys, rightKeys);
+      return new EnumerableSemiJoin(
+          cluster,
+          traitSet,
+          left,
+          right,
+          condition,
+          leftKeys,
+          rightKeys);
     } catch (InvalidRelException e) {
       // Semantic error not possible. Must be a bug. Convert to
       // internal error.
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableThetaJoin.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableThetaJoin.java
index b052247..612b5c3 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableThetaJoin.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableThetaJoin.java
@@ -26,10 +26,12 @@ import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.InvalidRelException;
+import org.apache.calcite.rel.RelCollationTraitDef;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMdCollation;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexProgramBuilder;
@@ -73,6 +75,23 @@ public class EnumerableThetaJoin extends Join implements EnumerableRel {
     }
   }
 
+  /** Creates an EnumerableThetaJoin. */
+  public static EnumerableThetaJoin create(
+      RelNode left,
+      RelNode right,
+      RexNode condition,
+      Set<CorrelationId> variablesSet,
+      JoinRelType joinType) throws InvalidRelException {
+    final RelOptCluster cluster = left.getCluster();
+    final RelMetadataQuery mq = cluster.getMetadataQuery();
+    final RelTraitSet traitSet =
+        cluster.traitSetOf(EnumerableConvention.INSTANCE)
+            .replaceIfs(RelCollationTraitDef.INSTANCE,
+                () -> RelMdCollation.enumerableThetaJoin(mq, left, right, joinType));
+    return new EnumerableThetaJoin(cluster, traitSet, left, right, condition,
+        variablesSet, joinType);
+  }
+
   @Override public RelOptCost computeSelfCost(RelOptPlanner planner,
       RelMetadataQuery mq) {
     double rowCount = mq.getRowCount(this);
diff --git a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdCollation.java b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdCollation.java
index aa42f42..5c807f9 100644
--- a/core/src/main/java/org/apache/calcite/rel/metadata/RelMdCollation.java
+++ b/core/src/main/java/org/apache/calcite/rel/metadata/RelMdCollation.java
@@ -16,7 +16,11 @@
  */
 package org.apache.calcite.rel.metadata;
 
+import org.apache.calcite.adapter.enumerable.EnumerableCorrelate;
+import org.apache.calcite.adapter.enumerable.EnumerableJoin;
 import org.apache.calcite.adapter.enumerable.EnumerableMergeJoin;
+import org.apache.calcite.adapter.enumerable.EnumerableSemiJoin;
+import org.apache.calcite.adapter.enumerable.EnumerableThetaJoin;
 import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.hep.HepRelVertex;
@@ -29,6 +33,7 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Calc;
 import org.apache.calcite.rel.core.Filter;
 import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.core.SortExchange;
@@ -42,6 +47,7 @@ import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexProgram;
+import org.apache.calcite.sql.SemiJoinType;
 import org.apache.calcite.sql.validate.SqlMonotonicity;
 import org.apache.calcite.util.BuiltInMethod;
 import org.apache.calcite.util.ImmutableIntList;
@@ -131,6 +137,33 @@ public class RelMdCollation
             join.getLeftKeys(), join.getRightKeys()));
   }
 
+  public ImmutableList<RelCollation> collations(EnumerableJoin join,
+      RelMetadataQuery mq) {
+    return ImmutableList.copyOf(
+        RelMdCollation.enumerableJoin(mq, join.getLeft(), join.getRight(), join.getJoinType()));
+  }
+
+  public ImmutableList<RelCollation> collations(EnumerableThetaJoin join,
+      RelMetadataQuery mq) {
+    return ImmutableList.copyOf(
+        RelMdCollation.enumerableThetaJoin(mq, join.getLeft(), join.getRight(), join.getJoinType())
+    );
+  }
+
+  public ImmutableList<RelCollation> collations(EnumerableCorrelate join,
+      RelMetadataQuery mq) {
+    return ImmutableList.copyOf(
+        RelMdCollation.enumerableCorrelate(mq, join.getLeft(), join.getRight(), join.getJoinType())
+    );
+  }
+
+  public ImmutableList<RelCollation> collations(EnumerableSemiJoin join,
+      RelMetadataQuery mq) {
+    return ImmutableList.copyOf(
+        RelMdCollation.enumerableSemiJoin(mq, join.getLeft(), join.getRight())
+    );
+  }
+
   public ImmutableList<RelCollation> collations(Sort sort,
       RelMetadataQuery mq) {
     return ImmutableList.copyOf(
@@ -380,6 +413,59 @@ public class RelMdCollation
     }
     return builder.build();
   }
+
+  /**
+   * Returns the collation of {@link EnumerableJoin} based on its inputs and the join type.
+   */
+  public static List<RelCollation> enumerableJoin(RelMetadataQuery mq,
+      RelNode left, RelNode right, JoinRelType joinType) {
+    return enumerableJoin0(mq, left, right, joinType);
+  }
+
+  /**
+   * Returns the collation of {@link EnumerableThetaJoin} based on its inputs and the join type.
+   */
+  public static List<RelCollation> enumerableThetaJoin(RelMetadataQuery mq,
+      RelNode left, RelNode right, JoinRelType joinType) {
+    return enumerableJoin0(mq, left, right, joinType);
+  }
+
+  public static List<RelCollation> enumerableCorrelate(RelMetadataQuery mq,
+      RelNode left, RelNode right, SemiJoinType joinType) {
+    // The current implementation always preserve the sort order of the left input
+    return mq.collations(left);
+  }
+
+  public static List<RelCollation> enumerableSemiJoin(RelMetadataQuery mq,
+      RelNode left, RelNode right) {
+    // The current implementation always preserve the sort order of the left input
+    return mq.collations(left);
+  }
+
+  private static List<RelCollation> enumerableJoin0(RelMetadataQuery mq,
+      RelNode left, RelNode right, JoinRelType joinType) {
+    // The current implementation can preserve the sort order of the left input if one of the
+    // following conditions hold:
+    // (i) join type is INNER or LEFT;
+    // (ii) RelCollation always orders nulls last.
+    final ImmutableList<RelCollation> leftCollations = mq.collations(left);
+    switch (joinType) {
+    case INNER:
+    case LEFT:
+      return leftCollations;
+    case RIGHT:
+    case FULL:
+      for (RelCollation collation : leftCollations) {
+        for (RelFieldCollation field : collation.getFieldCollations()) {
+          if (!(RelFieldCollation.NullDirection.LAST == field.nullDirection)) {
+            return ImmutableList.of();
+          }
+        }
+      }
+      return leftCollations;
+    }
+    return ImmutableList.of();
+  }
 }
 
 // End RelMdCollation.java
diff --git a/core/src/test/java/org/apache/calcite/rel/rules/SortRemoveRuleTest.java b/core/src/test/java/org/apache/calcite/rel/rules/SortRemoveRuleTest.java
new file mode 100644
index 0000000..16529ae
--- /dev/null
+++ b/core/src/test/java/org/apache/calcite/rel/rules/SortRemoveRuleTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import org.apache.calcite.adapter.enumerable.EnumerableConvention;
+import org.apache.calcite.adapter.enumerable.EnumerableRules;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schemas.HrClusteredSchema;
+import org.apache.calcite.sql.SqlExplainFormat;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.Planner;
+import org.apache.calcite.tools.Programs;
+import org.apache.calcite.tools.RuleSet;
+import org.apache.calcite.tools.RuleSets;
+import org.apache.calcite.util.Util;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests the application of the {@link SortRemoveRule}.
+ */
+public final class SortRemoveRuleTest {
+
+  /**
+   * The default schema that is used in these tests provides tables sorted on the primary key. Due
+   * to this scan operators always come with a {@link org.apache.calcite.rel.RelCollation} trait.
+   */
+  private RelNode transform(String sql, RuleSet prepareRules) throws Exception {
+    final SchemaPlus rootSchema = Frameworks.createRootSchema(true);
+    final SchemaPlus defSchema = rootSchema.add("hr", new HrClusteredSchema());
+    final FrameworkConfig config = Frameworks.newConfigBuilder()
+        .parserConfig(SqlParser.Config.DEFAULT)
+        .defaultSchema(defSchema)
+        .traitDefs(ConventionTraitDef.INSTANCE, RelCollationTraitDef.INSTANCE)
+        .programs(
+            Programs.of(prepareRules),
+            Programs.ofRules(SortRemoveRule.INSTANCE))
+        .build();
+    Planner planner = Frameworks.getPlanner(config);
+    SqlNode parse = planner.parse(sql);
+    SqlNode validate = planner.validate(parse);
+    RelRoot planRoot = planner.rel(validate);
+    RelNode planBefore = planRoot.rel;
+    RelTraitSet desiredTraits = planBefore.getTraitSet()
+        .replace(EnumerableConvention.INSTANCE)
+        .replace(planRoot.collation).simplify();
+    RelNode planAfter = planner.transform(0, desiredTraits, planBefore);
+    return planner.transform(1, desiredTraits, planAfter);
+  }
+
+  /** Test case for
+   * <a href="https://issues.apache.org/jira/browse/CALCITE-2554">[CALCITE-2554]
+   * Enrich enumerable join operators with order preserving information</a>.
+   *
+   * Since join inputs are sorted, and this join preserves the order of the left input, there
+   * shouldn't be any sort operator above the join.
+   */
+  @Test public void removeSortOverEnumerableJoin() throws Exception {
+    RuleSet prepareRules =
+        RuleSets.ofList(
+            SortProjectTransposeRule.INSTANCE,
+            EnumerableRules.ENUMERABLE_JOIN_RULE,
+            EnumerableRules.ENUMERABLE_PROJECT_RULE,
+            EnumerableRules.ENUMERABLE_SORT_RULE,
+            EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE);
+    for (String joinType : Arrays.asList("left", "right", "full", "inner")) {
+      String sql =
+          "select e.\"deptno\" from \"hr\".\"emps\" e "
+              + joinType + " join \"hr\".\"depts\" d "
+              + " on e.\"deptno\" = d.\"deptno\" "
+              + "order by e.\"empid\" ";
+      RelNode actualPlan = transform(sql, prepareRules);
+      assertThat(
+          toString(actualPlan),
+          allOf(
+              containsString("EnumerableJoin"),
+              not(containsString("EnumerableSort"))));
+    }
+  }
+
+
+  /** Test case for
+   * <a href="https://issues.apache.org/jira/browse/CALCITE-2554">[CALCITE-2554]
+   * Enrich enumerable join operators with order preserving information</a>.
+   *
+   * Since join inputs are sorted, and this join preserves the order of the left input, there
+   * shouldn't be any sort operator above the join.
+   */
+  @Test public void removeSortOverEnumerableThetaJoin() throws Exception {
+    RuleSet prepareRules =
+        RuleSets.ofList(
+            SortProjectTransposeRule.INSTANCE,
+            EnumerableRules.ENUMERABLE_JOIN_RULE,
+            EnumerableRules.ENUMERABLE_PROJECT_RULE,
+            EnumerableRules.ENUMERABLE_SORT_RULE,
+            EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE);
+    // Inner join is not considered since the ENUMERABLE_JOIN_RULE does not generate a theta join
+    // in the case of inner joins.
+    for (String joinType : Arrays.asList("left", "right", "full")) {
+      String sql =
+          "select e.\"deptno\" from \"hr\".\"emps\" e "
+              + joinType + " join \"hr\".\"depts\" d "
+              + " on e.\"deptno\" > d.\"deptno\" "
+              + "order by e.\"empid\" ";
+      RelNode actualPlan = transform(sql, prepareRules);
+      assertThat(
+          toString(actualPlan),
+          allOf(
+              containsString("EnumerableThetaJoin"),
+              not(containsString("EnumerableSort"))));
+    }
+  }
+
+  /** Test case for
+   * <a href="https://issues.apache.org/jira/browse/CALCITE-2554">[CALCITE-2554]
+   * Enrich enumerable join operators with order preserving information</a>.
+   *
+   * Since join inputs are sorted, and this join preserves the order of the left input, there
+   * shouldn't be any sort operator above the join.
+   */
+  @Test public void removeSortOverEnumerableCorrelate() throws Exception {
+    RuleSet prepareRules =
+        RuleSets.ofList(
+            SortProjectTransposeRule.INSTANCE,
+            JoinToCorrelateRule.INSTANCE,
+            EnumerableRules.ENUMERABLE_SORT_RULE,
+            EnumerableRules.ENUMERABLE_PROJECT_RULE,
+            EnumerableRules.ENUMERABLE_CORRELATE_RULE,
+            EnumerableRules.ENUMERABLE_FILTER_RULE,
+            EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE);
+    for (String joinType : Arrays.asList("left", "inner")) {
+      String sql =
+          "select e.\"deptno\" from \"hr\".\"emps\" e "
+              + joinType + " join \"hr\".\"depts\" d "
+              + " on e.\"deptno\" = d.\"deptno\" "
+              + "order by e.\"empid\" ";
+      RelNode actualPlan = transform(sql, prepareRules);
+      assertThat(
+          toString(actualPlan),
+          allOf(
+              containsString("EnumerableCorrelate"),
+              not(containsString("EnumerableSort"))));
+    }
+  }
+
+  /** Test case for
+   * <a href="https://issues.apache.org/jira/browse/CALCITE-2554">[CALCITE-2554]
+   * Enrich enumerable join operators with order preserving information</a>.
+   *
+   * Since join inputs are sorted, and this join preserves the order of the left input, there
+   * shouldn't be any sort operator above the join.
+   */
+  @Test public void removeSortOverEnumerableSemiJoin() throws Exception {
+    RuleSet prepareRules =
+        RuleSets.ofList(
+            SortProjectTransposeRule.INSTANCE,
+            SemiJoinRule.PROJECT,
+            SemiJoinRule.JOIN,
+            EnumerableRules.ENUMERABLE_PROJECT_RULE,
+            EnumerableRules.ENUMERABLE_SORT_RULE,
+            EnumerableRules.ENUMERABLE_SEMI_JOIN_RULE,
+            EnumerableRules.ENUMERABLE_FILTER_RULE,
+            EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE);
+    String sql =
+        "select e.\"deptno\" from \"hr\".\"emps\" e\n"
+            + " where e.\"deptno\" in (select d.\"deptno\" from \"hr\".\"depts\" d)\n"
+            + " order by e.\"empid\"";
+    RelNode actualPlan = transform(sql, prepareRules);
+    assertThat(
+        toString(actualPlan),
+        allOf(
+            containsString("EnumerableSemiJoin"),
+            not(containsString("EnumerableSort"))));
+  }
+
+  private String toString(RelNode rel) {
+    return Util.toLinux(
+        RelOptUtil.dumpPlan("", rel, SqlExplainFormat.TEXT,
+            SqlExplainLevel.DIGEST_ATTRIBUTES));
+  }
+
+}
+// End SortRemoveRuleTest.java
diff --git a/core/src/test/java/org/apache/calcite/schemas/HrClusteredSchema.java b/core/src/test/java/org/apache/calcite/schemas/HrClusteredSchema.java
new file mode 100644
index 0000000..e405d44
--- /dev/null
+++ b/core/src/test/java/org/apache/calcite/schemas/HrClusteredSchema.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.schemas;
+
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Linq4j;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.Statistics;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.AbstractSchema;
+import org.apache.calcite.schema.impl.AbstractTable;
+import org.apache.calcite.util.ImmutableBitSet;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * A typical HR schema with employees (emps) and departments (depts) tables that are naturally
+ * ordered based on their primary keys representing clustered tables.
+ */
+public final class HrClusteredSchema extends AbstractSchema {
+
+  private final ImmutableMap<String, Table> tables;
+
+  public HrClusteredSchema() {
+    super();
+    tables = ImmutableMap.<String, Table>builder()
+        .put("emps",
+            new PkClusteredTable(
+                factory ->
+                    new RelDataTypeFactory.Builder(factory)
+                        .add("empid", factory.createJavaType(int.class))
+                        .add("deptno", factory.createJavaType(int.class))
+                        .add("name", factory.createJavaType(String.class))
+                        .add("salary", factory.createJavaType(int.class))
+                        .add("commission", factory.createJavaType(Integer.class))
+                        .build(),
+                ImmutableBitSet.of(0),
+                Arrays.asList(
+                    new Object[]{100, 10, "Bill", 10000, 1000},
+                    new Object[]{110, 10, "Theodore", 11500, 250},
+                    new Object[]{150, 10, "Sebastian", 7000, null},
+                    new Object[]{200, 20, "Eric", 8000, 500})
+            ))
+        .put("depts",
+            new PkClusteredTable(
+                factory ->
+                    new RelDataTypeFactory.Builder(factory)
+                        .add("deptno", factory.createJavaType(int.class))
+                        .add("name", factory.createJavaType(String.class))
+                        .build(),
+                ImmutableBitSet.of(0),
+                Arrays.asList(
+                    new Object[]{10, "Sales"},
+                    new Object[]{30, "Marketing"},
+                    new Object[]{40, "HR"})
+            )).build();
+  }
+
+  @Override protected Map<String, Table> getTableMap() {
+    return tables;
+  }
+
+  /**
+   * A table sorted (ascending direction and nulls last) on the primary key.
+   */
+  private static class PkClusteredTable extends AbstractTable implements ScannableTable {
+    private final ImmutableBitSet pkColumns;
+    private final List<Object[]> data;
+    private final Function<RelDataTypeFactory, RelDataType> typeBuilder;
+
+    PkClusteredTable(
+        Function<RelDataTypeFactory, RelDataType> dataTypeBuilder,
+        ImmutableBitSet pkColumns,
+        List<Object[]> data) {
+      this.data = data;
+      this.typeBuilder = dataTypeBuilder;
+      this.pkColumns = pkColumns;
+    }
+
+    @Override public Statistic getStatistic() {
+      List<RelFieldCollation> collationFields = new ArrayList<>();
+      for (Integer key : pkColumns) {
+        collationFields.add(
+            new RelFieldCollation(
+                key,
+                RelFieldCollation.Direction.ASCENDING,
+                RelFieldCollation.NullDirection.LAST));
+      }
+      return Statistics.of(data.size(), ImmutableList.of(pkColumns),
+          ImmutableList.of(RelCollations.of(collationFields)));
+    }
+
+    @Override public RelDataType getRowType(final RelDataTypeFactory typeFactory) {
+      return typeBuilder.apply(typeFactory);
+    }
+
+    @Override public Enumerable<Object[]> scan(final DataContext root) {
+      return Linq4j.asEnumerable(data);
+    }
+
+  }
+}
+// End HrClusteredSchema.java
diff --git a/core/src/test/java/org/apache/calcite/test/CalciteSuite.java b/core/src/test/java/org/apache/calcite/test/CalciteSuite.java
index aebc9b3..e7ad3d9 100644
--- a/core/src/test/java/org/apache/calcite/test/CalciteSuite.java
+++ b/core/src/test/java/org/apache/calcite/test/CalciteSuite.java
@@ -36,6 +36,7 @@ import org.apache.calcite.rel.RelCollationTest;
 import org.apache.calcite.rel.RelDistributionTest;
 import org.apache.calcite.rel.rel2sql.RelToSqlConverterTest;
 import org.apache.calcite.rel.rules.DateRangeRulesTest;
+import org.apache.calcite.rel.rules.SortRemoveRuleTest;
 import org.apache.calcite.rex.RexBuilderTest;
 import org.apache.calcite.rex.RexExecutorTest;
 import org.apache.calcite.rex.RexSqlStandardConvertletTableTest;
@@ -182,6 +183,7 @@ import org.junit.runners.Suite;
     CoreQuidemTest.class,
     CalciteRemoteDriverTest.class,
     StreamTest.class,
+    SortRemoveRuleTest.class,
 
     // test cases
     TableInRootSchemaTest.class,
diff --git a/core/src/test/java/org/apache/calcite/tools/PlannerTest.java b/core/src/test/java/org/apache/calcite/tools/PlannerTest.java
index 7bd85b6..41b6684 100644
--- a/core/src/test/java/org/apache/calcite/tools/PlannerTest.java
+++ b/core/src/test/java/org/apache/calcite/tools/PlannerTest.java
@@ -48,6 +48,8 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rel.rules.FilterMergeRule;
 import org.apache.calcite.rel.rules.ProjectMergeRule;
 import org.apache.calcite.rel.rules.ProjectToWindowRule;
+import org.apache.calcite.rel.rules.SortJoinTransposeRule;
+import org.apache.calcite.rel.rules.SortProjectTransposeRule;
 import org.apache.calcite.rel.rules.SortRemoveRule;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -393,6 +395,46 @@ public class PlannerTest {
             + "    EnumerableTableScan(table=[[hr, emps]])\n"));
   }
 
+  /** Test case for
+   * <a href="https://issues.apache.org/jira/browse/CALCITE-2554">[CALCITE-2554]
+   * Enrich EnumerableJoin operator with order preserving information</a>.
+   *
+   * Since left input to the join is sorted, and this join preserves order, there shouldn't be
+   * any sort operator above the join.
+   */
+  @Test public void testRedundantSortOnJoinPlan() throws Exception {
+    RuleSet ruleSet =
+        RuleSets.ofList(
+            SortRemoveRule.INSTANCE,
+            SortJoinTransposeRule.INSTANCE,
+            SortProjectTransposeRule.INSTANCE,
+            EnumerableRules.ENUMERABLE_LIMIT_RULE,
+            EnumerableRules.ENUMERABLE_JOIN_RULE,
+            EnumerableRules.ENUMERABLE_PROJECT_RULE,
+            EnumerableRules.ENUMERABLE_SORT_RULE);
+    Planner planner = getPlanner(null, Programs.of(ruleSet));
+    SqlNode parse = planner.parse(
+        "select e.\"deptno\" from \"emps\" e "
+            + "left outer join \"depts\" d "
+            + " on e.\"deptno\" = d.\"deptno\" "
+            + "order by e.\"deptno\" "
+            + "limit 10");
+    SqlNode validate = planner.validate(parse);
+    RelNode convert = planner.rel(validate).rel;
+    RelTraitSet traitSet = convert.getTraitSet()
+        .replace(EnumerableConvention.INSTANCE).simplify();
+    RelNode transform = planner.transform(0, traitSet, convert);
+    assertThat(toString(transform),
+        equalTo("EnumerableProject(deptno=[$1])\n"
+        + "  EnumerableLimit(fetch=[10])\n"
+        + "    EnumerableJoin(condition=[=($1, $5)], joinType=[left])\n"
+        + "      EnumerableLimit(fetch=[10])\n"
+        + "        EnumerableSort(sort0=[$1], dir0=[ASC])\n"
+        + "          EnumerableTableScan(table=[[hr, emps]])\n"
+        + "      EnumerableProject(deptno=[$0], name=[$1], employees=[$2], x=[$3.x], y=[$3.y])\n"
+        + "        EnumerableTableScan(table=[[hr, depts]])\n"));
+  }
+
   /** Unit test that parses, validates, converts and
    * plans for query using two duplicate order by.
    * The duplicate order by should be removed by SortRemoveRule. */
diff --git a/core/src/test/resources/sql/join.iq b/core/src/test/resources/sql/join.iq
index 18a20fe..cda407c 100644
--- a/core/src/test/resources/sql/join.iq
+++ b/core/src/test/resources/sql/join.iq
@@ -281,11 +281,10 @@ order by empno limit 10;
 !ok
 EnumerableCalc(expr#0..10=[{inputs}], expr#11=[COALESCE($t7, $t8)], DEPTNO=[$t11], EMPNO=[$t0], ENAME=[$t1], JOB=[$t2], MGR=[$t3], HIREDATE=[$t4], SAL=[$t5], COMM=[$t6], DNAME=[$t9], LOC=[$t10])
   EnumerableLimit(fetch=[10])
-    EnumerableSort(sort0=[$0], dir0=[ASC])
-      EnumerableJoin(condition=[=($7, $8)], joinType=[left])
-        EnumerableLimit(fetch=[10])
-          EnumerableTableScan(table=[[scott, EMP]])
-        EnumerableTableScan(table=[[scott, DEPT]])
+    EnumerableJoin(condition=[=($7, $8)], joinType=[left])
+      EnumerableLimit(fetch=[10])
+        EnumerableTableScan(table=[[scott, EMP]])
+      EnumerableTableScan(table=[[scott, DEPT]])
 !plan
 
 # End join.iq
diff --git a/core/src/test/resources/sql/sub-query.iq b/core/src/test/resources/sql/sub-query.iq
index 26c0795..f581cec 100644
--- a/core/src/test/resources/sql/sub-query.iq
+++ b/core/src/test/resources/sql/sub-query.iq
@@ -1009,7 +1009,7 @@ EnumerableCalc(expr#0..3=[{inputs}], expr#4=[false], expr#5=[=($t2, $t4)], expr#
     EnumerableLimit(fetch=[1])
       EnumerableSort(sort0=[$0], dir0=[DESC])
         EnumerableAggregate(group=[{0}], c=[COUNT()])
-          EnumerableCalc(expr#0..2=[{inputs}], expr#3=[true], expr#4=[10], expr#5=[=($t4, $t0)], cs=[$t3], $condition=[$t5])
+          EnumerableCalc(expr#0..2=[{inputs}], expr#3=[true], expr#4=[10], expr#5=[CAST($t0):TINYINT], expr#6=[=($t4, $t5)], expr#7=[IS NULL($t5)], expr#8=[OR($t6, $t7)], cs=[$t3], $condition=[$t8])
             EnumerableTableScan(table=[[scott, DEPT]])
 !plan
 
@@ -1259,7 +1259,7 @@ EnumerableCalc(expr#0..3=[{inputs}], expr#4=[IS NULL($t3)], expr#5=[false], expr
     EnumerableLimit(fetch=[1])
       EnumerableSort(sort0=[$0], dir0=[DESC])
         EnumerableAggregate(group=[{0}], c=[COUNT()])
-          EnumerableCalc(expr#0..2=[{inputs}], expr#3=[true], expr#4=[10], expr#5=[=($t4, $t0)], cs=[$t3], $condition=[$t5])
+          EnumerableCalc(expr#0..2=[{inputs}], expr#3=[true], expr#4=[10], expr#5=[CAST($t0):TINYINT], expr#6=[=($t4, $t5)], expr#7=[IS NULL($t5)], expr#8=[OR($t6, $t7)], cs=[$t3], $condition=[$t8])
             EnumerableTableScan(table=[[scott, DEPT]])
 !plan
 
diff --git a/linq4j/src/test/java/org/apache/calcite/linq4j/test/JoinPreserveOrderTest.java b/linq4j/src/test/java/org/apache/calcite/linq4j/test/JoinPreserveOrderTest.java
new file mode 100644
index 0000000..f399a30
--- /dev/null
+++ b/linq4j/src/test/java/org/apache/calcite/linq4j/test/JoinPreserveOrderTest.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.linq4j.test;
+
+import org.apache.calcite.linq4j.CorrelateJoinType;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.EnumerableDefaults;
+import org.apache.calcite.linq4j.Linq4j;
+import org.apache.calcite.linq4j.function.Function1;
+import org.apache.calcite.linq4j.function.Function2;
+
+import static org.apache.calcite.linq4j.function.Functions.nullsComparator;
+
+import org.junit.Assume;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+
+/**
+ * Test validating the order preserving properties of join algorithms in
+ * {@link org.apache.calcite.linq4j.ExtendedEnumerable}. The correctness of the join algorithm is
+ * not examined by this set of tests.
+ *
+ * To verify that the order of left/right/both input(s) is preserved they must be all ordered by at
+ * least one column. The inputs are either sorted on the join or some other column. For the tests to
+ * be meaningful the result of the join must not be empty.
+ *
+ * Interesting variants that may affect the join output and thus destroy the order of one or both
+ * inputs is when the join column or the sorted column (when join column != sort column)
+ * contain nulls or duplicate values.
+ *
+ * In addition, the way that nulls are sorted before the join can also play an important role
+ * regarding the order preserving semantics of the join.
+ *
+ * Last but not least, the type of the join (left/right/full/inner/semi/anti) has a major impact on
+ * the preservation of order for the various joins.
+ */
+@RunWith(Parameterized.class)
+public final class JoinPreserveOrderTest {
+
+  /**
+   * A description holding which column must be sorted and how.
+   * @param <T> the type of the input relation
+   */
+  private static class FieldCollationDescription<T> {
+    private final String colName;
+    private final Function1<T, Comparable> colSelector;
+    private final boolean isAscending;
+    private final boolean isNullsFirst;
+
+    FieldCollationDescription(final String colName,
+        final Function1<T, Comparable> colSelector,
+        final boolean isAscending,
+        final boolean isNullsFirst) {
+      this.colName = colName;
+      this.colSelector = colSelector;
+      this.isAscending = isAscending;
+      this.isNullsFirst = isNullsFirst;
+    }
+
+    @Override public String toString() {
+      return "on='" + colName + "', asc=" + isAscending + ", nullsFirst=" + isNullsFirst + '}';
+    }
+  }
+
+  /**
+   * An abstraction for a join algorithm which performs an operation on two inputs and produces a
+   * result.
+   *
+   * @param <L> the type of the left input
+   * @param <R> the type of the right input
+   * @param <Result> the type of the result
+   */
+  private interface JoinAlgorithm<L, R, Result> {
+    Enumerable<Result> join(Enumerable<L> left, Enumerable<R> right);
+  }
+
+  private final FieldCollationDescription<Employee> leftColumn;
+  private final FieldCollationDescription<Department> rightColumn;
+  private static final Function2<Employee, Department, List<Integer>> RESULT_SELECTOR =
+      (emp, dept) -> Arrays.asList(
+          (emp != null) ? emp.eid : null,
+          (dept != null) ? dept.did : null);
+
+  public JoinPreserveOrderTest(
+      final FieldCollationDescription<Employee> leftColumn,
+      final FieldCollationDescription<Department> rightColumn) {
+    this.leftColumn = leftColumn;
+    this.rightColumn = rightColumn;
+  }
+
+  @Parameterized.Parameters(name = "{index}: columnLeft({0}), columnRight({1})")
+  public static Collection<Object[]> data() {
+    List<Object[]> data = new ArrayList<>();
+    List<String> empOrderColNames = Arrays.asList("name", "deptno", "eid");
+    List<Function1<Employee, Comparable>> empOrderColSelectors = Arrays.asList(
+        Employee::getName,
+        Employee::getDeptno,
+        Employee::getEid);
+    List<String> deptOrderColNames = Arrays.asList("name", "deptno", "did");
+    List<Function1<Department, Comparable>> deptOrderColSelectors = Arrays.asList(
+        Department::getName,
+        Department::getDeptno,
+        Department::getDid);
+    List<Boolean> trueFalse = Arrays.asList(true, false);
+    for (int i = 0; i < empOrderColNames.size(); i++) {
+      for (Boolean ascendingL : trueFalse) {
+        for (Boolean nullsFirstL : trueFalse) {
+          for (int j = 0; j < deptOrderColNames.size(); j++) {
+            for (Boolean nullsFirstR : trueFalse) {
+              for (Boolean ascendingR : trueFalse) {
+                Object[] params = new Object[2];
+                params[0] = new FieldCollationDescription<>(
+                    empOrderColNames.get(i),
+                    empOrderColSelectors.get(i),
+                    ascendingL,
+                    nullsFirstL);
+                params[1] = new FieldCollationDescription<>(
+                    deptOrderColNames.get(j),
+                    deptOrderColSelectors.get(j),
+                    ascendingR,
+                    nullsFirstR
+                );
+                data.add(params);
+              }
+            }
+          }
+        }
+      }
+    }
+    return data;
+  }
+
+  @Test public void testLeftJoinPreservesOrderOfLeftInput() {
+    testJoin(hashJoin(false, true), AssertOrder.PRESERVED, AssertOrder.IGNORED);
+  }
+
+  @Test public void testRightJoinPreservesOrderOfLeftInput() {
+    Assume.assumeFalse(leftColumn.isNullsFirst);
+    testJoin(hashJoin(true, false), AssertOrder.PRESERVED, AssertOrder.IGNORED);
+  }
+
+  @Test public void testFullJoinPreservesOrderOfLeftInput() {
+    Assume.assumeFalse(leftColumn.isNullsFirst);
+    testJoin(hashJoin(true, true), AssertOrder.PRESERVED, AssertOrder.IGNORED);
+  }
+
+  @Test public void testInnerJoinPreservesOrderOfLeftInput() {
+    testJoin(hashJoin(false, false), AssertOrder.PRESERVED, AssertOrder.IGNORED);
+  }
+
+  @Test public void testLeftThetaJoinPreservesOrderOfLeftInput() {
+    testJoin(thetaJoin(false, true), AssertOrder.PRESERVED, AssertOrder.IGNORED);
+  }
+
+  @Test public void testRightThetaJoinPreservesOrderOfLeftInput() {
+    Assume.assumeFalse(leftColumn.isNullsFirst);
+    testJoin(thetaJoin(true, false), AssertOrder.PRESERVED, AssertOrder.IGNORED);
+  }
+
+  @Test public void testFullThetaJoinPreservesOrderOfLeftInput() {
+    Assume.assumeFalse(leftColumn.isNullsFirst);
+    testJoin(thetaJoin(true, true), AssertOrder.PRESERVED, AssertOrder.IGNORED);
+  }
+
+  @Test public void testInnerThetaJoinPreservesOrderOfLeftInput() {
+    testJoin(thetaJoin(false, false), AssertOrder.PRESERVED, AssertOrder.IGNORED);
+  }
+
+
+  @Test public void testLeftCorrelateJoinPreservesOrderOfLeftInput() {
+    testJoin(correlateJoin(CorrelateJoinType.LEFT), AssertOrder.PRESERVED, AssertOrder.IGNORED);
+  }
+
+  @Test public void testInnerCorrelateJoinPreservesOrderOfLeftInput() {
+    testJoin(correlateJoin(CorrelateJoinType.INNER), AssertOrder.PRESERVED, AssertOrder.IGNORED);
+  }
+
+  @Test public void testAntiCorrelateJoinPreservesOrderOfLeftInput() {
+    testJoin(correlateJoin(CorrelateJoinType.ANTI), AssertOrder.PRESERVED, AssertOrder.IGNORED);
+  }
+
+  @Test public void testSemiCorrelateJoinPreservesOrderOfLeftInput() {
+    testJoin(correlateJoin(CorrelateJoinType.SEMI), AssertOrder.PRESERVED, AssertOrder.IGNORED);
+  }
+
+  @Test public void testSemiDefaultJoinPreservesOrderOfLeftInput() {
+    testJoin(semiJoin(), AssertOrder.PRESERVED, AssertOrder.IGNORED);
+  }
+
+  private void testJoin(
+      JoinAlgorithm<Employee, Department, List<Integer>> joinAlgorithm,
+      AssertOrder assertLeftInput, AssertOrder assertRightInput) {
+    Enumerable<Employee> left =
+        Linq4j.asEnumerable(EMPS)
+            .orderBy(leftColumn.colSelector,
+                nullsComparator(leftColumn.isNullsFirst, !leftColumn.isAscending));
+    Enumerable<Department> right =
+        Linq4j.asEnumerable(DEPTS)
+            .orderBy(rightColumn.colSelector,
+                nullsComparator(rightColumn.isNullsFirst, !rightColumn.isAscending));
+    Enumerable<List<Integer>> joinResult = joinAlgorithm.join(left, right);
+
+    List<Integer> actualIdOrderLeft = joinResult.select(joinTuple -> joinTuple.get(0)).toList();
+    List<Integer> expectedIdOrderLeft = left.select(e -> e.eid).toList();
+    assertLeftInput.check(expectedIdOrderLeft, actualIdOrderLeft, leftColumn.isNullsFirst);
+    List<Integer> actualIdOrderRight = joinResult.select(joinTuple -> joinTuple.get(1)).toList();
+    List<Integer> expectedIdOrderRight = right.select(d -> d.did).toList();
+    assertRightInput.check(expectedIdOrderRight, actualIdOrderRight, rightColumn.isNullsFirst);
+  }
+
+  private JoinAlgorithm<Employee, Department, List<Integer>> correlateJoin(
+      CorrelateJoinType joinType) {
+    return (left, right) ->
+        left.correlateJoin(
+            joinType,
+            emp -> right.where(dept ->
+                emp.deptno != null && dept.deptno != null && emp.deptno.equals(dept.deptno)
+            ),
+            RESULT_SELECTOR);
+  }
+
+  private JoinAlgorithm<Employee, Department, List<Integer>> hashJoin(
+      boolean generateNullsOnLeft,
+      boolean generateNullsOnRight) {
+    return (left, right) ->
+        left.join(right,
+            e -> e.deptno,
+            d -> d.deptno,
+            RESULT_SELECTOR,
+            null,
+            generateNullsOnLeft,
+            generateNullsOnRight);
+  }
+
+  private JoinAlgorithm<Employee, Department, List<Integer>> thetaJoin(
+      boolean generateNullsOnLeft,
+      boolean generateNullsOnRight) {
+    return (left, right) ->
+        EnumerableDefaults.thetaJoin(
+            left,
+            right,
+            (emp, dept) ->
+                emp.deptno != null && dept.deptno != null && emp.deptno.equals(dept.deptno),
+            RESULT_SELECTOR,
+            generateNullsOnLeft,
+            generateNullsOnRight);
+  }
+
+  private JoinAlgorithm<Employee, Department, List<Integer>> semiJoin() {
+    return (left, right) ->
+        EnumerableDefaults.semiJoin(
+            left,
+            right,
+            emp -> emp.deptno,
+            dept -> dept.deptno).select(emp -> Arrays.asList(emp.eid, null)
+        );
+  }
+
+  /**
+   * Different assertions for the result of the join.
+   */
+  private enum AssertOrder {
+    PRESERVED {
+      @Override <E> void check(final List<E> expected, final List<E> actual,
+          final boolean nullsFirst) {
+        assertTrue("Order is not preserved. Expected:<" + expected + "> but was:<" + actual + ">",
+            isOrderPreserved(expected, actual, nullsFirst));
+      }
+    },
+    DESTROYED {
+      @Override <E> void check(final List<E> expected, final List<E> actual,
+          final boolean nullsFirst) {
+        assertFalse("Order is not destroyed. Expected:<" + expected + "> but was:<" + actual + ">",
+            isOrderPreserved(expected, actual, nullsFirst));
+      }
+    },
+    IGNORED {
+      @Override <E> void check(final List<E> expected, final List<E> actual,
+          final boolean nullsFirst) {
+        // Do nothing
+      }
+    };
+
+    abstract <E> void check(List<E> expected, List<E> actual, boolean nullsFirst);
+
+    /**
+     * Checks that the elements in the list are in the expected order.
+     */
+    <E> boolean isOrderPreserved(List<E> expected, List<E> actual, boolean nullsFirst) {
+      boolean isPreserved = true;
+      for (int i = 1; i < actual.size(); i++) {
+        E prev = actual.get(i - 1);
+        E next = actual.get(i);
+        int posPrev = prev == null ? (nullsFirst ? -1 : actual.size()) : expected.indexOf(prev);
+        int posNext = next == null ? (nullsFirst ? -1 : actual.size()) : expected.indexOf(next);
+        isPreserved &= posPrev <= posNext;
+      }
+      return isPreserved;
+    }
+  }
+
+  /** Department */
+  private static class Department {
+    private final int did;
+    private final Integer deptno;
+    private final String name;
+
+    Department(final int did, final Integer deptno, final String name) {
+      this.did = did;
+      this.deptno = deptno;
+      this.name = name;
+    }
+
+    int getDid() {
+      return did;
+    }
+
+    Integer getDeptno() {
+      return deptno;
+    }
+
+    String getName() {
+      return name;
+    }
+  }
+
+  /** Employee */
+  private static class Employee {
+    private final int eid;
+    private final String name;
+    private final Integer deptno;
+
+    Employee(final int eid, final String name, final Integer deptno) {
+      this.eid = eid;
+      this.name = name;
+      this.deptno = deptno;
+    }
+
+    int getEid() {
+      return eid;
+    }
+
+    String getName() {
+      return name;
+    }
+
+    Integer getDeptno() {
+      return deptno;
+    }
+
+    @Override public String toString() {
+      return "Employee{eid=" + eid + ", name='" + name + '\'' + ", deptno=" + deptno + '}';
+    }
+  }
+
+  private static final Employee[] EMPS = new Employee[]{
+      new Employee(100, "Stam", 10),
+      new Employee(110, "Greg", 20),
+      new Employee(120, "Ilias", 30),
+      new Employee(130, "Ruben", 40),
+      new Employee(140, "Tanguy", 50),
+      new Employee(150, "Andrew", -10),
+      // Nulls on name
+      new Employee(160, null, 60),
+      new Employee(170, null, -60),
+      // Nulls on deptno
+      new Employee(180, "Achille", null),
+      // Duplicate values on name
+      new Employee(190, "Greg", 70),
+      new Employee(200, "Ilias", -70),
+      // Duplicates values on deptno
+      new Employee(210, "Sophia", 40),
+      new Employee(220, "Alexia", -40),
+      new Employee(230, "Loukia", -40)
+  };
+
+  private static final Department[] DEPTS = new Department[]{
+      new Department(1, 10, "Sales"),
+      new Department(2, 20, "Pre-sales"),
+      new Department(4, 40, "Support"),
+      new Department(5, 50, "Marketing"),
+      new Department(6, 60, "Engineering"),
+      new Department(7, 70, "Management"),
+      new Department(8, 80, "HR"),
+      new Department(9, 90, "Product design"),
+      // Nulls on name
+      new Department(3, 30, null),
+      new Department(10, 100, null),
+      // Nulls on deptno
+      new Department(11, null, "Post-sales"),
+      // Duplicate values on name
+      new Department(12, 50, "Support"),
+      new Department(13, 140, "Support"),
+      // Duplicate values on deptno
+      new Department(14, 20, "Board"),
+      new Department(15, 40, "Promotions"),
+  };
+
+}
+
+// End JoinPreserveOrderTest.java
diff --git a/linq4j/src/test/java/org/apache/calcite/linq4j/test/Linq4jSuite.java b/linq4j/src/test/java/org/apache/calcite/linq4j/test/Linq4jSuite.java
index 8a0cbb9..b1a7f20 100644
--- a/linq4j/src/test/java/org/apache/calcite/linq4j/test/Linq4jSuite.java
+++ b/linq4j/src/test/java/org/apache/calcite/linq4j/test/Linq4jSuite.java
@@ -37,7 +37,8 @@ import org.junit.runners.Suite;
     BlockBuilderTest.class,
     FunctionTest.class,
     TypeTest.class,
-    CorrelateJoinTest.class
+    CorrelateJoinTest.class,
+    JoinPreserveOrderTest.class
     })
 public class Linq4jSuite {
 }