You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/07/13 18:54:09 UTC

[GitHub] [samza] shanthoosh commented on a change in pull request #1384: SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins

shanthoosh commented on a change in pull request #1384:
URL: https://github.com/apache/samza/pull/1384#discussion_r441004901



##########
File path: samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
##########
@@ -52,9 +52,9 @@
   public Collection<RelRoot> convertDsl(String dsl) {
     // TODO: Introduce an API to parse a dsl string and return one or more sql statements
     List<String> sqlStmts = fetchSqlFromConfig(config);
-    QueryPlanner planner = getQueryPlanner(getSqlConfig(sqlStmts, config));
     List<RelRoot> relRoots = new LinkedList<>();
     for (String sql: sqlStmts) {
+      QueryPlanner planner = getQueryPlanner(getSqlConfig(Collections.singletonList(sql), config));

Review comment:
       What is the rationale for recreating the planner for every sql-statement in the app?

##########
File path: samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
##########
@@ -140,16 +166,45 @@ public RelRoot plan(String query) {
           .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
           .sqlToRelConverterConfig(SqlToRelConverter.Config.DEFAULT)
           .traitDefs(traitDefs)
-          .context(Contexts.EMPTY_CONTEXT)
-          .costFactory(null)
+          .programs(Programs.hep(rules, true, DefaultRelMetadataProvider.INSTANCE))
           .build();
-      Planner planner = Frameworks.getPlanner(frameworkConfig);
+      planner = Frameworks.getPlanner(frameworkConfig);
+      return planner;
+    } catch (Exception e) {
+      String errorMsg = "Failed to create planner.";
+      LOG.error(errorMsg, e);
+      throw new SamzaException(errorMsg, e);
+    }
+  }
 
+  private RelRoot optimize(RelRoot relRoot) {
+    RelTraitSet relTraitSet = RelTraitSet.createEmpty();
+    try {
+      RelRoot optimizedRelRoot =
+          RelRoot.of(getPlanner().transform(0, relTraitSet, relRoot.project()), SqlKind.SELECT);
+      LOG.info("query plan with optimization:\n"

Review comment:
       Nit: 
   1. By default, Logger adds `\n` to EOL. Unnecessary to explicitly add it.
   2. Also, it's better to capitalize all the log-messages.

##########
File path: samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
##########
@@ -129,6 +142,13 @@ public RelRoot plan(String query) {
       sqlOperatorTables.add(new SamzaSqlOperatorTable());
       sqlOperatorTables.add(new SamzaSqlUdfOperatorTable(samzaSqlFunctions));
 
+      // TODO: Introduce a pluggable rule factory.

Review comment:
       Would be better to create a follow-up ticket for this action-item. 

##########
File path: samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlFilterRemoteJoinRule.java
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.planner;
+
+import java.util.Map;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.translator.JoinInputNode;
+import org.apache.samza.sql.translator.JoinInputNode.InputType;
+
+/**
+ * Planner rule for remote table joins that pushes filters above and
+ * within a join node into its children nodes.
+ * This class is customized form of Calcite's {@link org.apache.calcite.rel.rules.FilterJoinRule} for
+ * remote table joins.
+ */
+public abstract class SamzaSqlFilterRemoteJoinRule extends RelOptRule {
+  /** Whether to try to strengthen join-type. */
+  private final boolean smart;
+
+  Map<String, SqlIOConfig> systemStreamConfigBySource;
+
+  //~ Constructors -----------------------------------------------------------
+
+  /**
+   * Creates a SamzaSqlFilterRemoteJoinRule with an explicit root operand and
+   * factories.
+   */
+  protected SamzaSqlFilterRemoteJoinRule(RelOptRuleOperand operand, String id,
+      boolean smart, RelBuilderFactory relBuilderFactory, Map<String, SqlIOConfig> systemStreamConfigBySource) {
+    super(operand, relBuilderFactory, "SamzaSqlFilterRemoteJoinRule:" + id);
+    this.smart = smart;
+    this.systemStreamConfigBySource = systemStreamConfigBySource;
+  }
+
+  //~ Methods ----------------------------------------------------------------
+
+  protected void perform(RelOptRuleCall call, Filter filter,
+      Join join) {
+    final List<RexNode> joinFilters =
+        RelOptUtil.conjunctions(join.getCondition());
+
+    boolean donotOptimizeLeft = false;
+    boolean donotOptimizeRight = false;
+
+    JoinInputNode.InputType inputTypeOnLeft =
+        JoinInputNode.getInputType(join.getLeft(), systemStreamConfigBySource);
+    JoinInputNode.InputType inputTypeOnRight =
+        JoinInputNode.getInputType(join.getRight(), systemStreamConfigBySource);
+
+    // Disable this optimnization for queries using local table.

Review comment:
       nit: %s/optimnization/optimization

##########
File path: samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlFilterRemoteJoinRule.java
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.planner;
+
+import java.util.Map;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.translator.JoinInputNode;
+import org.apache.samza.sql.translator.JoinInputNode.InputType;
+
+/**
+ * Planner rule for remote table joins that pushes filters above and
+ * within a join node into its children nodes.
+ * This class is customized form of Calcite's {@link org.apache.calcite.rel.rules.FilterJoinRule} for
+ * remote table joins.
+ */
+public abstract class SamzaSqlFilterRemoteJoinRule extends RelOptRule {
+  /** Whether to try to strengthen join-type. */
+  private final boolean smart;
+
+  Map<String, SqlIOConfig> systemStreamConfigBySource;
+
+  //~ Constructors -----------------------------------------------------------
+
+  /**
+   * Creates a SamzaSqlFilterRemoteJoinRule with an explicit root operand and
+   * factories.
+   */
+  protected SamzaSqlFilterRemoteJoinRule(RelOptRuleOperand operand, String id,
+      boolean smart, RelBuilderFactory relBuilderFactory, Map<String, SqlIOConfig> systemStreamConfigBySource) {
+    super(operand, relBuilderFactory, "SamzaSqlFilterRemoteJoinRule:" + id);
+    this.smart = smart;
+    this.systemStreamConfigBySource = systemStreamConfigBySource;
+  }
+
+  //~ Methods ----------------------------------------------------------------
+
+  protected void perform(RelOptRuleCall call, Filter filter,
+      Join join) {
+    final List<RexNode> joinFilters =
+        RelOptUtil.conjunctions(join.getCondition());
+
+    boolean donotOptimizeLeft = false;
+    boolean donotOptimizeRight = false;
+
+    JoinInputNode.InputType inputTypeOnLeft =
+        JoinInputNode.getInputType(join.getLeft(), systemStreamConfigBySource);
+    JoinInputNode.InputType inputTypeOnRight =
+        JoinInputNode.getInputType(join.getRight(), systemStreamConfigBySource);
+
+    // Disable this optimnization for queries using local table.
+    if (inputTypeOnLeft == InputType.LOCAL_TABLE || inputTypeOnRight == InputType.LOCAL_TABLE) {
+      donotOptimizeLeft = true;
+      donotOptimizeRight = true;
+    }
+
+    // There is nothing to optimize on the remote table side as the lookup needs to happen first before filtering.
+    if (inputTypeOnLeft == InputType.REMOTE_TABLE) {
+      donotOptimizeLeft = true;
+    }
+    if (inputTypeOnRight == InputType.REMOTE_TABLE) {
+      donotOptimizeRight = true;
+    }
+
+    // If there is only the joinRel,
+    // make sure it does not match a cartesian product joinRel
+    // (with "true" condition), otherwise this rule will be applied
+    // again on the new cartesian product joinRel.
+    if (filter == null && joinFilters.isEmpty()) {
+      return;
+    }
+
+    final List<RexNode> aboveFilters =
+        filter != null
+            ? RelOptUtil.conjunctions(filter.getCondition())
+            : new ArrayList<>();
+    final com.google.common.collect.ImmutableList<RexNode> origAboveFilters =
+        com.google.common.collect.ImmutableList.copyOf(aboveFilters);
+
+    // Simplify Outer Joins
+    JoinRelType joinType = join.getJoinType();
+    if (smart
+        && !origAboveFilters.isEmpty()
+        && join.getJoinType() != JoinRelType.INNER) {
+      joinType = RelOptUtil.simplifyJoin(join, origAboveFilters, joinType);
+    }
+
+    final List<RexNode> leftFilters = new ArrayList<>();
+    final List<RexNode> rightFilters = new ArrayList<>();
+
+    // TODO - add logic to derive additional filters.  E.g., from
+    // (t1.a = 1 AND t2.a = 2) OR (t1.b = 3 AND t2.b = 4), you can
+    // derive table filters:
+    // (t1.a = 1 OR t1.b = 3)
+    // (t2.a = 2 OR t2.b = 4)
+
+    // Try to push down above filters. These are typically where clause
+    // filters. They can be pushed down if they are not on the NULL
+    // generating side.
+    // We do not push into join condition as we do not benefit much. There is also correctness issue
+    // with remote table as we will not have values for the remote table before the join/lookup.
+    boolean filterPushed = false;
+    if (RelOptUtil.classifyFilters(
+        join,
+        aboveFilters,
+        joinType,
+        false, // Let's not push into join filter
+        !joinType.generatesNullsOnLeft() && !donotOptimizeLeft,
+        !joinType.generatesNullsOnRight() && !donotOptimizeRight,
+        joinFilters,
+        leftFilters,
+        rightFilters)) {
+      filterPushed = true;
+    }
+
+    // If no filter got pushed after validate, reset filterPushed flag
+    if (leftFilters.isEmpty()
+        && rightFilters.isEmpty()) {
+      filterPushed = false;
+    }
+
+    boolean isAntiJoin = joinType == JoinRelType.ANTI;
+
+    // Try to push down filters in ON clause. A ON clause filter can only be
+    // pushed down if it does not affect the non-matching set, i.e. it is
+    // not on the side which is preserved.
+    // A ON clause filter of anti-join can not be pushed down.
+    if (!isAntiJoin && RelOptUtil.classifyFilters(
+        join,
+        joinFilters,

Review comment:
       Just curious. Can you please clarify if we should use `aboveFilters` here for third argument? From the semantics  of classifyFilters method, it seems like appropriate choice. Might be better to rename the variable to something better(other than aboveFilters).  

##########
File path: samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
##########
@@ -129,6 +142,13 @@ public RelRoot plan(String query) {
       sqlOperatorTables.add(new SamzaSqlOperatorTable());
       sqlOperatorTables.add(new SamzaSqlUdfOperatorTable(samzaSqlFunctions));
 
+      // TODO: Introduce a pluggable rule factory.
+      List<RelOptRule> rules = ImmutableList.of(

Review comment:
       Is there a way to determine if these rel-rules are applied on a rel-plan and emit a metric(or log it before/after the optimization) for debugging purposes. 

##########
File path: samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
##########
@@ -140,16 +160,48 @@ public RelRoot plan(String query) {
           .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
           .sqlToRelConverterConfig(SqlToRelConverter.Config.DEFAULT)
           .traitDefs(traitDefs)
-          .context(Contexts.EMPTY_CONTEXT)
-          .costFactory(null)
+          .programs(Programs.hep(rules, true, DefaultRelMetadataProvider.INSTANCE))
           .build();
-      Planner planner = Frameworks.getPlanner(frameworkConfig);
+      planner = Frameworks.getPlanner(frameworkConfig);
+      return planner;
+    } catch (Exception e) {
+      String errorMsg = "Failed to create planner.";
+      LOG.error(errorMsg, e);
+      if (planner != null) {
+        planner.close();
+      }
+      throw new SamzaException(errorMsg, e);
+    }
+  }
 
+  private RelRoot optimize(Planner planner, RelRoot relRoot) {
+    RelTraitSet relTraitSet = RelTraitSet.createEmpty();
+    try {
+      RelRoot optimizedRelRoot =
+          RelRoot.of(planner.transform(0, relTraitSet, relRoot.project()), SqlKind.SELECT);
+      LOG.info("query plan with optimization:\n"
+          + RelOptUtil.toString(optimizedRelRoot.rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES));
+      return optimizedRelRoot;
+    } catch (Exception e) {
+      String errorMsg =
+          "Error while optimizing query plan:\n" + RelOptUtil.toString(relRoot.rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES);
+      LOG.error(errorMsg, e);
+      planner.close();

Review comment:
       Calling close() here seems unnecessary, It's already closed at the caller already with try-with-resources-closeable. 

##########
File path: samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
##########
@@ -140,16 +160,48 @@ public RelRoot plan(String query) {
           .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
           .sqlToRelConverterConfig(SqlToRelConverter.Config.DEFAULT)
           .traitDefs(traitDefs)
-          .context(Contexts.EMPTY_CONTEXT)
-          .costFactory(null)
+          .programs(Programs.hep(rules, true, DefaultRelMetadataProvider.INSTANCE))
           .build();
-      Planner planner = Frameworks.getPlanner(frameworkConfig);
+      planner = Frameworks.getPlanner(frameworkConfig);
+      return planner;
+    } catch (Exception e) {
+      String errorMsg = "Failed to create planner.";
+      LOG.error(errorMsg, e);
+      if (planner != null) {
+        planner.close();
+      }
+      throw new SamzaException(errorMsg, e);
+    }
+  }
 
+  private RelRoot optimize(Planner planner, RelRoot relRoot) {
+    RelTraitSet relTraitSet = RelTraitSet.createEmpty();
+    try {
+      RelRoot optimizedRelRoot =
+          RelRoot.of(planner.transform(0, relTraitSet, relRoot.project()), SqlKind.SELECT);
+      LOG.info("query plan with optimization:\n"
+          + RelOptUtil.toString(optimizedRelRoot.rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES));
+      return optimizedRelRoot;
+    } catch (Exception e) {
+      String errorMsg =
+          "Error while optimizing query plan:\n" + RelOptUtil.toString(relRoot.rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES);
+      LOG.error(errorMsg, e);
+      planner.close();
+      throw new SamzaException(errorMsg, e);
+    }
+  }
+
+  public RelRoot plan(String query) {
+    try (Planner planner = getPlanner()) {
       SqlNode sql = planner.parse(query);
       SqlNode validatedSql = planner.validate(sql);
       RelRoot relRoot = planner.rel(validatedSql);
-      LOG.info("query plan:\n" + RelOptUtil.toString(relRoot.rel, SqlExplainLevel.ALL_ATTRIBUTES));
-      return relRoot;
+      LOG.info(
+          "query plan without optimization:\n" + RelOptUtil.toString(relRoot.rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES));

Review comment:
       Please capitalize log messages and remove \n at the end(which gets added by default).

##########
File path: samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
##########
@@ -165,6 +165,7 @@ private static Planner createPlanner() {
         .traitDefs(traitDefs)
         .context(Contexts.EMPTY_CONTEXT)
         .costFactory(null)
+        //.programs(Programs.CALC_PROGRAM)

Review comment:
       Just curious. Why did we comment this out? If this is not necessary, then can we please remove it?

##########
File path: samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlFilterRemoteJoinRule.java
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.planner;
+
+import java.util.Map;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.translator.JoinInputNode;
+import org.apache.samza.sql.translator.JoinInputNode.InputType;
+
+/**
+ * Planner rule for remote table joins that pushes filters above and
+ * within a join node into its children nodes.
+ * This class is customized form of Calcite's {@link org.apache.calcite.rel.rules.FilterJoinRule} for
+ * remote table joins.
+ */
+public abstract class SamzaSqlFilterRemoteJoinRule extends RelOptRule {
+  /** Whether to try to strengthen join-type. */
+  private final boolean smart;
+
+  Map<String, SqlIOConfig> systemStreamConfigBySource;
+
+  //~ Constructors -----------------------------------------------------------
+
+  /**
+   * Creates a SamzaSqlFilterRemoteJoinRule with an explicit root operand and
+   * factories.
+   */
+  protected SamzaSqlFilterRemoteJoinRule(RelOptRuleOperand operand, String id,
+      boolean smart, RelBuilderFactory relBuilderFactory, Map<String, SqlIOConfig> systemStreamConfigBySource) {
+    super(operand, relBuilderFactory, "SamzaSqlFilterRemoteJoinRule:" + id);
+    this.smart = smart;
+    this.systemStreamConfigBySource = systemStreamConfigBySource;
+  }
+
+  //~ Methods ----------------------------------------------------------------
+
+  protected void perform(RelOptRuleCall call, Filter filter,
+      Join join) {
+    final List<RexNode> joinFilters =
+        RelOptUtil.conjunctions(join.getCondition());
+
+    boolean donotOptimizeLeft = false;
+    boolean donotOptimizeRight = false;
+
+    JoinInputNode.InputType inputTypeOnLeft =
+        JoinInputNode.getInputType(join.getLeft(), systemStreamConfigBySource);
+    JoinInputNode.InputType inputTypeOnRight =
+        JoinInputNode.getInputType(join.getRight(), systemStreamConfigBySource);
+
+    // Disable this optimnization for queries using local table.
+    if (inputTypeOnLeft == InputType.LOCAL_TABLE || inputTypeOnRight == InputType.LOCAL_TABLE) {
+      donotOptimizeLeft = true;
+      donotOptimizeRight = true;
+    }
+
+    // There is nothing to optimize on the remote table side as the lookup needs to happen first before filtering.
+    if (inputTypeOnLeft == InputType.REMOTE_TABLE) {
+      donotOptimizeLeft = true;
+    }
+    if (inputTypeOnRight == InputType.REMOTE_TABLE) {
+      donotOptimizeRight = true;
+    }
+
+    // If there is only the joinRel,
+    // make sure it does not match a cartesian product joinRel
+    // (with "true" condition), otherwise this rule will be applied
+    // again on the new cartesian product joinRel.
+    if (filter == null && joinFilters.isEmpty()) {
+      return;
+    }
+
+    final List<RexNode> aboveFilters =
+        filter != null
+            ? RelOptUtil.conjunctions(filter.getCondition())
+            : new ArrayList<>();
+    final com.google.common.collect.ImmutableList<RexNode> origAboveFilters =
+        com.google.common.collect.ImmutableList.copyOf(aboveFilters);
+
+    // Simplify Outer Joins
+    JoinRelType joinType = join.getJoinType();
+    if (smart
+        && !origAboveFilters.isEmpty()
+        && join.getJoinType() != JoinRelType.INNER) {
+      joinType = RelOptUtil.simplifyJoin(join, origAboveFilters, joinType);
+    }
+
+    final List<RexNode> leftFilters = new ArrayList<>();
+    final List<RexNode> rightFilters = new ArrayList<>();
+
+    // TODO - add logic to derive additional filters.  E.g., from
+    // (t1.a = 1 AND t2.a = 2) OR (t1.b = 3 AND t2.b = 4), you can
+    // derive table filters:
+    // (t1.a = 1 OR t1.b = 3)
+    // (t2.a = 2 OR t2.b = 4)
+
+    // Try to push down above filters. These are typically where clause
+    // filters. They can be pushed down if they are not on the NULL
+    // generating side.
+    // We do not push into join condition as we do not benefit much. There is also correctness issue
+    // with remote table as we will not have values for the remote table before the join/lookup.
+    boolean filterPushed = false;
+    if (RelOptUtil.classifyFilters(
+        join,
+        aboveFilters,
+        joinType,
+        false, // Let's not push into join filter
+        !joinType.generatesNullsOnLeft() && !donotOptimizeLeft,
+        !joinType.generatesNullsOnRight() && !donotOptimizeRight,
+        joinFilters,
+        leftFilters,
+        rightFilters)) {
+      filterPushed = true;
+    }
+
+    // If no filter got pushed after validate, reset filterPushed flag
+    if (leftFilters.isEmpty()
+        && rightFilters.isEmpty()) {
+      filterPushed = false;
+    }
+
+    boolean isAntiJoin = joinType == JoinRelType.ANTI;
+
+    // Try to push down filters in ON clause. A ON clause filter can only be
+    // pushed down if it does not affect the non-matching set, i.e. it is
+    // not on the side which is preserved.
+    // A ON clause filter of anti-join can not be pushed down.
+    if (!isAntiJoin && RelOptUtil.classifyFilters(
+        join,
+        joinFilters,
+        joinType,
+        false,
+        !joinType.generatesNullsOnLeft() && !donotOptimizeLeft,
+        !joinType.generatesNullsOnRight() && !donotOptimizeRight,
+        joinFilters,
+        leftFilters,

Review comment:
       `Leftfilters` and `rightFilters` are initialized and are not modified. It's very hard to find where they're populated. Please add a comment here that leftFilters and rightFilters will be populated by this classifyFilters method. 

##########
File path: samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlFilterRemoteJoinRule.java
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.planner;
+
+import java.util.Map;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.translator.JoinInputNode;
+import org.apache.samza.sql.translator.JoinInputNode.InputType;
+
+/**
+ * Planner rule for remote table joins that pushes filters above and
+ * within a join node into its children nodes.
+ * This class is customized form of Calcite's {@link org.apache.calcite.rel.rules.FilterJoinRule} for
+ * remote table joins.
+ */
+public abstract class SamzaSqlFilterRemoteJoinRule extends RelOptRule {
+  /** Whether to try to strengthen join-type. */
+  private final boolean smart;
+
+  Map<String, SqlIOConfig> systemStreamConfigBySource;
+
+  //~ Constructors -----------------------------------------------------------
+
+  /**
+   * Creates a SamzaSqlFilterRemoteJoinRule with an explicit root operand and
+   * factories.
+   */
+  protected SamzaSqlFilterRemoteJoinRule(RelOptRuleOperand operand, String id,
+      boolean smart, RelBuilderFactory relBuilderFactory, Map<String, SqlIOConfig> systemStreamConfigBySource) {
+    super(operand, relBuilderFactory, "SamzaSqlFilterRemoteJoinRule:" + id);
+    this.smart = smart;
+    this.systemStreamConfigBySource = systemStreamConfigBySource;
+  }
+
+  //~ Methods ----------------------------------------------------------------
+
+  protected void perform(RelOptRuleCall call, Filter filter,
+      Join join) {
+    final List<RexNode> joinFilters =
+        RelOptUtil.conjunctions(join.getCondition());
+
+    boolean donotOptimizeLeft = false;
+    boolean donotOptimizeRight = false;
+
+    JoinInputNode.InputType inputTypeOnLeft =
+        JoinInputNode.getInputType(join.getLeft(), systemStreamConfigBySource);
+    JoinInputNode.InputType inputTypeOnRight =
+        JoinInputNode.getInputType(join.getRight(), systemStreamConfigBySource);
+
+    // Disable this optimnization for queries using local table.
+    if (inputTypeOnLeft == InputType.LOCAL_TABLE || inputTypeOnRight == InputType.LOCAL_TABLE) {
+      donotOptimizeLeft = true;
+      donotOptimizeRight = true;
+    }
+
+    // There is nothing to optimize on the remote table side as the lookup needs to happen first before filtering.
+    if (inputTypeOnLeft == InputType.REMOTE_TABLE) {
+      donotOptimizeLeft = true;
+    }
+    if (inputTypeOnRight == InputType.REMOTE_TABLE) {
+      donotOptimizeRight = true;
+    }
+
+    // If there is only the joinRel,
+    // make sure it does not match a cartesian product joinRel
+    // (with "true" condition), otherwise this rule will be applied
+    // again on the new cartesian product joinRel.
+    if (filter == null && joinFilters.isEmpty()) {
+      return;
+    }
+
+    final List<RexNode> aboveFilters =
+        filter != null
+            ? RelOptUtil.conjunctions(filter.getCondition())
+            : new ArrayList<>();
+    final com.google.common.collect.ImmutableList<RexNode> origAboveFilters =

Review comment:
       Please import the ImmutableList class and don't hardcode the package paths. There're multiple occurrences in this file and else-where.

##########
File path: samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlFilterRemoteJoinRule.java
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.planner;
+
+import java.util.Map;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.translator.JoinInputNode;
+import org.apache.samza.sql.translator.JoinInputNode.InputType;
+
+/**
+ * Planner rule for remote table joins that pushes filters above and
+ * within a join node into its children nodes.
+ * This class is customized form of Calcite's {@link org.apache.calcite.rel.rules.FilterJoinRule} for
+ * remote table joins.
+ */
+public abstract class SamzaSqlFilterRemoteJoinRule extends RelOptRule {

Review comment:
       Just curious. There seems to be considerable duplication with FilterJoinRule calcite native-class. Post [CALCITE-3170](https://issues.apache.org/jira/browse/CALCITE-3170), calcite supports anti-join on conditions push-down natively. If we upgrade to 1.21.0 rel-planner, then wouldn't overriding match suffice here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org