You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by at...@apache.org on 2020/07/20 06:03:08 UTC

[samza] branch master updated: SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins (#1384)

This is an automated email from the ASF dual-hosted git repository.

atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new f665824  SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins (#1384)
f665824 is described below

commit f665824c1043bc3088bb629cedcabb37e8e91b4d
Author: Aditya Toomula <at...@linkedin.com>
AuthorDate: Sun Jul 19 23:03:00 2020 -0700

    SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins (#1384)
    
    * SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins
    
    * SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins
    
    * SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins
    
    * SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins
    
    * SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins
    
    * SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins
    
    * SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins
    
    * SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins
    
    * SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins
    
    * SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins
    
    * Fix checkstyle errors
    
    * Fix checkstyle errors
    
    * Fix checkstyle errors
    
    * SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins
---
 .../apache/samza/sql/dsl/SamzaSqlDslConverter.java |   4 +-
 .../org/apache/samza/sql/planner/QueryPlanner.java |  70 ++++-
 .../sql/planner/SamzaSqlFilterRemoteJoinRule.java  | 261 ++++++++++++++++
 .../samza/sql/planner/SamzaSqlValidator.java       |   7 +-
 .../sql/runner/SamzaSqlApplicationConfig.java      |   7 +
 .../apache/samza/sql/translator/JoinInputNode.java |  40 ++-
 .../samza/sql/translator/JoinTranslator.java       |  49 +--
 .../samza/sql/translator/QueryTranslator.java      |   2 +-
 .../apache/samza/sql/util/SamzaSqlQueryParser.java |   1 +
 .../apache/samza/sql/planner/TestQueryPlanner.java | 345 +++++++++++++++++++++
 .../test/samzasql/TestSamzaSqlRemoteTable.java     | 118 ++++++-
 11 files changed, 851 insertions(+), 53 deletions(-)

diff --git a/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
index b09d3d6..acc5b42 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
@@ -52,9 +52,9 @@ public class SamzaSqlDslConverter implements DslConverter {
   public Collection<RelRoot> convertDsl(String dsl) {
     // TODO: Introduce an API to parse a dsl string and return one or more sql statements
     List<String> sqlStmts = fetchSqlFromConfig(config);
-    QueryPlanner planner = getQueryPlanner(getSqlConfig(sqlStmts, config));
     List<RelRoot> relRoots = new LinkedList<>();
     for (String sql: sqlStmts) {
+      QueryPlanner planner = getQueryPlanner(getSqlConfig(Collections.singletonList(sql), config));
       // we always pass only select query to the planner for samza sql. The reason is that samza sql supports
       // schema evolution where source and destination could up to an extent have independent schema evolution while
       // calcite expects strict comformance of the destination schema with that of the fields in the select query.
@@ -87,7 +87,7 @@ public class SamzaSqlDslConverter implements DslConverter {
    */
   public static QueryPlanner getQueryPlanner(SamzaSqlApplicationConfig sqlConfig) {
     return new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(),
-        sqlConfig.getUdfMetadata());
+        sqlConfig.getUdfMetadata(), sqlConfig.isQueryPlanOptimizerEnabled());
   }
 
   /**
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
index 8990bf1..767df43 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.sql.planner;
 
+import com.google.common.collect.ImmutableList;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.util.ArrayList;
@@ -28,12 +29,17 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.calcite.config.Lex;
 import org.apache.calcite.jdbc.CalciteConnection;
-import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelCollationTraitDef;
 import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider;
+import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
+import org.apache.calcite.rel.rules.ProjectMergeRule;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.schema.SchemaPlus;
@@ -41,6 +47,7 @@ import org.apache.calcite.schema.Table;
 import org.apache.calcite.schema.impl.AbstractSchema;
 import org.apache.calcite.schema.impl.AbstractTable;
 import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlOperatorTable;
 import org.apache.calcite.sql.parser.SqlParser;
@@ -50,6 +57,7 @@ import org.apache.calcite.sql2rel.SqlToRelConverter;
 import org.apache.calcite.tools.FrameworkConfig;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.tools.Planner;
+import org.apache.calcite.tools.Programs;
 import org.apache.samza.SamzaException;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.RelSchemaProvider;
@@ -76,11 +84,15 @@ public class QueryPlanner {
   // Mapping between the source to the SqlIOConfig corresponding to the source.
   private final Map<String, SqlIOConfig> systemStreamConfigBySource;
 
+  private final boolean isQueryPlanOptimizerEnabled;
+
   public QueryPlanner(Map<String, RelSchemaProvider> relSchemaProviders,
-      Map<String, SqlIOConfig> systemStreamConfigBySource, Collection<UdfMetadata> udfMetadata) {
+      Map<String, SqlIOConfig> systemStreamConfigBySource, Collection<UdfMetadata> udfMetadata,
+      boolean isQueryPlanOptimizerEnabled) {
     this.relSchemaProviders = relSchemaProviders;
     this.systemStreamConfigBySource = systemStreamConfigBySource;
     this.udfMetadata = udfMetadata;
+    this.isQueryPlanOptimizerEnabled = isQueryPlanOptimizerEnabled;
   }
 
   private void registerSourceSchemas(SchemaPlus rootSchema) {
@@ -109,7 +121,8 @@ public class QueryPlanner {
     }
   }
 
-  public RelRoot plan(String query) {
+  private Planner getPlanner() {
+    Planner planner = null;
     try {
       Connection connection = DriverManager.getConnection("jdbc:calcite:");
       CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
@@ -117,7 +130,7 @@ public class QueryPlanner {
       registerSourceSchemas(rootSchema);
 
       List<SamzaSqlScalarFunctionImpl> samzaSqlFunctions = udfMetadata.stream()
-          .map(x -> new SamzaSqlScalarFunctionImpl(x))
+          .map(SamzaSqlScalarFunctionImpl::new)
           .collect(Collectors.toList());
 
       final List<RelTraitDef> traitDefs = new ArrayList<>();
@@ -129,6 +142,13 @@ public class QueryPlanner {
       sqlOperatorTables.add(new SamzaSqlOperatorTable());
       sqlOperatorTables.add(new SamzaSqlUdfOperatorTable(samzaSqlFunctions));
 
+      // TODO: Introduce a pluggable rule factory.
+      List<RelOptRule> rules = ImmutableList.of(
+          FilterProjectTransposeRule.INSTANCE,
+          ProjectMergeRule.INSTANCE,
+          new SamzaSqlFilterRemoteJoinRule.SamzaSqlFilterIntoRemoteJoinRule(true, RelFactories.LOGICAL_BUILDER,
+          systemStreamConfigBySource));
+
       // Using lenient so that !=,%,- are allowed.
       FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder()
           .parserConfig(SqlParser.configBuilder()
@@ -140,16 +160,48 @@ public class QueryPlanner {
           .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
           .sqlToRelConverterConfig(SqlToRelConverter.Config.DEFAULT)
           .traitDefs(traitDefs)
-          .context(Contexts.EMPTY_CONTEXT)
-          .costFactory(null)
+          .programs(Programs.hep(rules, true, DefaultRelMetadataProvider.INSTANCE))
           .build();
-      Planner planner = Frameworks.getPlanner(frameworkConfig);
+      planner = Frameworks.getPlanner(frameworkConfig);
+      return planner;
+    } catch (Exception e) {
+      String errorMsg = "Failed to create planner.";
+      LOG.error(errorMsg, e);
+      if (planner != null) {
+        planner.close();
+      }
+      throw new SamzaException(errorMsg, e);
+    }
+  }
 
+  private RelRoot optimize(Planner planner, RelRoot relRoot) {
+    RelTraitSet relTraitSet = RelTraitSet.createEmpty();
+    try {
+      RelRoot optimizedRelRoot =
+          RelRoot.of(planner.transform(0, relTraitSet, relRoot.project()), SqlKind.SELECT);
+      LOG.info("query plan with optimization:\n"
+          + RelOptUtil.toString(optimizedRelRoot.rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES));
+      return optimizedRelRoot;
+    } catch (Exception e) {
+      String errorMsg =
+          "Error while optimizing query plan:\n" + RelOptUtil.toString(relRoot.rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES);
+      LOG.error(errorMsg, e);
+      throw new SamzaException(errorMsg, e);
+    }
+  }
+
+  public RelRoot plan(String query) {
+    try (Planner planner = getPlanner()) {
       SqlNode sql = planner.parse(query);
       SqlNode validatedSql = planner.validate(sql);
       RelRoot relRoot = planner.rel(validatedSql);
-      LOG.info("query plan:\n" + RelOptUtil.toString(relRoot.rel, SqlExplainLevel.ALL_ATTRIBUTES));
-      return relRoot;
+      LOG.info(
+          "query plan without optimization:\n" + RelOptUtil.toString(relRoot.rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES));
+      if (!isQueryPlanOptimizerEnabled) {
+        LOG.info("Skipping query optimization as it is disabled.");
+        return relRoot;
+      }
+      return optimize(planner, relRoot);
     } catch (Exception e) {
       String errorMsg = SamzaSqlValidator.formatErrorString(query, e);
       LOG.error(errorMsg, e);
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlFilterRemoteJoinRule.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlFilterRemoteJoinRule.java
new file mode 100644
index 0000000..fc84f65
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlFilterRemoteJoinRule.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.planner;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Map;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.translator.JoinInputNode;
+import org.apache.samza.sql.translator.JoinInputNode.InputType;
+
+/**
+ * Planner rule for remote table joins that pushes filters above and
+ * within a join node into its children nodes.
+ * This class is customized form of Calcite's {@link org.apache.calcite.rel.rules.FilterJoinRule} for
+ * remote table joins.
+ */
+public abstract class SamzaSqlFilterRemoteJoinRule extends RelOptRule {
+  /** Whether to try to strengthen join-type. */
+  private final boolean smart;
+
+  Map<String, SqlIOConfig> systemStreamConfigBySource;
+
+  //~ Constructors -----------------------------------------------------------
+
+  /**
+   * Creates a SamzaSqlFilterRemoteJoinRule with an explicit root operand and
+   * factories.
+   */
+  protected SamzaSqlFilterRemoteJoinRule(RelOptRuleOperand operand, String id,
+      boolean smart, RelBuilderFactory relBuilderFactory, Map<String, SqlIOConfig> systemStreamConfigBySource) {
+    super(operand, relBuilderFactory, "SamzaSqlFilterRemoteJoinRule:" + id);
+    this.smart = smart;
+    this.systemStreamConfigBySource = systemStreamConfigBySource;
+  }
+
+  //~ Methods ----------------------------------------------------------------
+
+  protected void perform(RelOptRuleCall call, Filter filter,
+      Join join) {
+    final List<RexNode> joinFilters =
+        RelOptUtil.conjunctions(join.getCondition());
+
+    boolean donotOptimizeLeft = false;
+    boolean donotOptimizeRight = false;
+
+    JoinInputNode.InputType inputTypeOnLeft =
+        JoinInputNode.getInputType(join.getLeft(), systemStreamConfigBySource);
+    JoinInputNode.InputType inputTypeOnRight =
+        JoinInputNode.getInputType(join.getRight(), systemStreamConfigBySource);
+
+    // Disable this optimization for queries using local table.
+    if (inputTypeOnLeft == InputType.LOCAL_TABLE || inputTypeOnRight == InputType.LOCAL_TABLE) {
+      donotOptimizeLeft = true;
+      donotOptimizeRight = true;
+    }
+
+    // There is nothing to optimize on the remote table side as the lookup needs to happen first before filtering.
+    if (inputTypeOnLeft == InputType.REMOTE_TABLE) {
+      donotOptimizeLeft = true;
+    }
+    if (inputTypeOnRight == InputType.REMOTE_TABLE) {
+      donotOptimizeRight = true;
+    }
+
+    // If there is only the joinRel,
+    // make sure it does not match a cartesian product joinRel
+    // (with "true" condition), otherwise this rule will be applied
+    // again on the new cartesian product joinRel.
+    if (filter == null && joinFilters.isEmpty()) {
+      return;
+    }
+
+    final List<RexNode> aboveFilters =
+        filter != null
+            ? RelOptUtil.conjunctions(filter.getCondition())
+            : new ArrayList<>();
+    final ImmutableList<RexNode> origAboveFilters =
+        ImmutableList.copyOf(aboveFilters);
+
+    // Simplify Outer Joins
+    JoinRelType joinType = join.getJoinType();
+    if (smart
+        && !origAboveFilters.isEmpty()
+        && join.getJoinType() != JoinRelType.INNER) {
+      joinType = RelOptUtil.simplifyJoin(join, origAboveFilters, joinType);
+    }
+
+    final List<RexNode> leftFilters = new ArrayList<>();
+    final List<RexNode> rightFilters = new ArrayList<>();
+
+    // TODO - add logic to derive additional filters.  E.g., from
+    // (t1.a = 1 AND t2.a = 2) OR (t1.b = 3 AND t2.b = 4), you can
+    // derive table filters:
+    // (t1.a = 1 OR t1.b = 3)
+    // (t2.a = 2 OR t2.b = 4)
+
+    // Try to push down above filters. These are typically where clause
+    // filters. They can be pushed down if they are not on the NULL
+    // generating side.
+    // We do not push into join condition as we do not benefit much. There is also correctness issue
+    // with remote table as we will not have values for the remote table before the join/lookup.
+    // leftFilters and rightFilters are populated in classifyFilters API.
+    boolean filterPushed = false;
+    if (RelOptUtil.classifyFilters(
+        join,
+        aboveFilters,
+        joinType,
+        false, // Let's not push into join filter
+        !joinType.generatesNullsOnLeft() && !donotOptimizeLeft,
+        !joinType.generatesNullsOnRight() && !donotOptimizeRight,
+        joinFilters,
+        leftFilters,
+        rightFilters)) {
+      filterPushed = true;
+    }
+
+    // If no filter got pushed after validate, reset filterPushed flag
+    if (leftFilters.isEmpty()
+        && rightFilters.isEmpty()) {
+      filterPushed = false;
+    }
+
+    boolean isAntiJoin = joinType == JoinRelType.ANTI;
+
+    // Try to push down filters in ON clause. A ON clause filter can only be
+    // pushed down if it does not affect the non-matching set, i.e. it is
+    // not on the side which is preserved.
+    // A ON clause filter of anti-join can not be pushed down.
+    if (!isAntiJoin && RelOptUtil.classifyFilters(
+        join,
+        joinFilters,
+        joinType,
+        false,
+        !joinType.generatesNullsOnLeft() && !donotOptimizeLeft,
+        !joinType.generatesNullsOnRight() && !donotOptimizeRight,
+        joinFilters,
+        leftFilters,
+        rightFilters)) {
+      filterPushed = true;
+    }
+
+    // if nothing actually got pushed and there is nothing leftover,
+    // then this rule is a no-op
+    if ((!filterPushed
+        && joinType == join.getJoinType())
+        || (joinFilters.isEmpty()
+        && leftFilters.isEmpty()
+        && rightFilters.isEmpty())) {
+      return;
+    }
+
+    // create Filters on top of the children if any filters were
+    // pushed to them
+    final RexBuilder rexBuilder = join.getCluster().getRexBuilder();
+    final RelBuilder relBuilder = call.builder();
+
+    final RelNode leftRel = relBuilder.push(join.getLeft()).filter(leftFilters).build();
+    final RelNode rightRel = relBuilder.push(join.getRight()).filter(rightFilters).build();
+
+    // create the new join node referencing the new children and
+    // containing its new join filters (if there are any)
+    final ImmutableList<RelDataType> fieldTypes =
+        ImmutableList.<RelDataType>builder()
+            .addAll(RelOptUtil.getFieldTypeList(leftRel.getRowType()))
+            .addAll(RelOptUtil.getFieldTypeList(rightRel.getRowType())).build();
+    final RexNode joinFilter =
+        RexUtil.composeConjunction(rexBuilder,
+            RexUtil.fixUp(rexBuilder, joinFilters, fieldTypes));
+
+    // If nothing actually got pushed and there is nothing leftover,
+    // then this rule is a no-op
+    if (joinFilter.isAlwaysTrue()
+        && leftFilters.isEmpty()
+        && rightFilters.isEmpty()
+        && joinType == join.getJoinType()) {
+      return;
+    }
+
+    RelNode newJoinRel =
+        join.copy(
+            join.getTraitSet(),
+            joinFilter,
+            leftRel,
+            rightRel,
+            joinType,
+            join.isSemiJoinDone());
+    call.getPlanner().onCopy(join, newJoinRel);
+    if (!leftFilters.isEmpty()) {
+      call.getPlanner().onCopy(filter, leftRel);
+    }
+    if (!rightFilters.isEmpty()) {
+      call.getPlanner().onCopy(filter, rightRel);
+    }
+
+    relBuilder.push(newJoinRel);
+
+    // Create a project on top of the join if some of the columns have become
+    // NOT NULL due to the join-type getting stricter.
+    relBuilder.convert(join.getRowType(), false);
+
+    // create a FilterRel on top of the join if needed
+    relBuilder.filter(
+        RexUtil.fixUp(rexBuilder, aboveFilters,
+            RelOptUtil.getFieldTypeList(relBuilder.peek().getRowType())));
+
+    call.transformTo(relBuilder.build());
+  }
+
+  /** Rule that tries to push the stream side of the filter expressions into the input of the join. */
+  public static class SamzaSqlFilterIntoRemoteJoinRule extends SamzaSqlFilterRemoteJoinRule {
+    public SamzaSqlFilterIntoRemoteJoinRule(boolean smart,
+        RelBuilderFactory relBuilderFactory, Map<String, SqlIOConfig> systemStreamConfigBySource) {
+      super(
+          operand(Filter.class,
+              operand(Join.class, RelOptRule.any())),
+          "SamzaSqlFilterRemoteJoinRule:filter", smart, relBuilderFactory, systemStreamConfigBySource);
+    }
+
+    @Override public void onMatch(RelOptRuleCall call) {
+      Filter filter = call.rel(0);
+      Join join = call.rel(1);
+      perform(call, filter, join);
+    }
+  }
+}
+
+// End SamzaSqlFilterRemoteJoinRule.java
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
index 760da6d..f5a7e67 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
@@ -20,6 +20,7 @@
 package org.apache.samza.sql.planner;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Scanner;
@@ -69,10 +70,10 @@ public class SamzaSqlValidator {
    * @throws SamzaSqlValidatorException exception for sql validation
    */
   public void validate(List<String> sqlStmts) throws SamzaSqlValidatorException {
-    SamzaSqlApplicationConfig sqlConfig = SamzaSqlDslConverter.getSqlConfig(sqlStmts, config);
-    QueryPlanner planner = SamzaSqlDslConverter.getQueryPlanner(sqlConfig);
-
     for (String sql: sqlStmts) {
+      SamzaSqlApplicationConfig sqlConfig = SamzaSqlDslConverter.getSqlConfig(Collections.singletonList(sql), config);
+      QueryPlanner planner = SamzaSqlDslConverter.getQueryPlanner(sqlConfig);
+
       // we always pass only select query to the planner for samza sql. The reason is that samza sql supports
       // schema evolution where source and destination could up to an extent have independent schema evolution while
       // calcite expects strict conformance of the destination schema with that of the fields in the select query.
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index 4d52469..f98879c 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -90,6 +90,7 @@ public class SamzaSqlApplicationConfig {
   public static final String CFG_METADATA_TOPIC_PREFIX = "samza.sql.metadataTopicPrefix";
   public static final String CFG_GROUPBY_WINDOW_DURATION_MS = "samza.sql.groupby.window.ms";
   public static final String CFG_SQL_PROCESS_SYSTEM_EVENTS = "samza.sql.processSystemEvents";
+  public static final String CFG_SQL_ENABLE_PLAN_OPTIMIZER = "samza.sql.enablePlanOptimizer";
 
   public static final String SAMZA_SYSTEM_LOG = "log";
 
@@ -117,6 +118,7 @@ public class SamzaSqlApplicationConfig {
   private final String metadataTopicPrefix;
   private final long windowDurationMs;
   private final boolean processSystemEvents;
+  private final boolean enableQueryPlanOptimizer;
 
   public SamzaSqlApplicationConfig(Config staticConfig, List<String> inputSystemStreams,
       List<String> outputSystemStreams) {
@@ -170,6 +172,7 @@ public class SamzaSqlApplicationConfig {
 
     processSystemEvents = staticConfig.getBoolean(CFG_SQL_PROCESS_SYSTEM_EVENTS, true);
     windowDurationMs = staticConfig.getLong(CFG_GROUPBY_WINDOW_DURATION_MS, DEFAULT_GROUPBY_WINDOW_DURATION_MS);
+    enableQueryPlanOptimizer = staticConfig.getBoolean(CFG_SQL_ENABLE_PLAN_OPTIMIZER, true);
   }
 
   public static <T> T initializePlugin(String pluginName, String plugin, Config staticConfig,
@@ -336,4 +339,8 @@ public class SamzaSqlApplicationConfig {
   public boolean isProcessSystemEvents() {
     return processSystemEvents;
   }
+
+  public boolean isQueryPlanOptimizerEnabled() {
+    return enableQueryPlanOptimizer;
+  }
 }
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java
index d952194..1a0a7ec 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java
@@ -20,14 +20,21 @@
 package org.apache.samza.sql.translator;
 
 import java.util.List;
+import java.util.Map;
+import org.apache.calcite.plan.hep.HepRelVertex;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.table.descriptors.CachingTableDescriptor;
+import org.apache.samza.table.descriptors.RemoteTableDescriptor;
 
 
 /**
  * This class represents the input node for the join. It can be either a table or a stream.
  */
-class JoinInputNode {
+public class JoinInputNode {
 
   // Calcite RelNode corresponding to the input
   private final RelNode relNode;
@@ -37,7 +44,7 @@ class JoinInputNode {
   private final InputType inputType;
   private final boolean isPosOnRight;
 
-  enum InputType {
+  public enum InputType {
     STREAM,
     LOCAL_TABLE,
     REMOTE_TABLE
@@ -73,4 +80,33 @@ class JoinInputNode {
   boolean isPosOnRight() {
     return isPosOnRight;
   }
+
+  public static JoinInputNode.InputType getInputType(
+      RelNode relNode, Map<String, SqlIOConfig> systemStreamConfigBySource) {
+
+    // NOTE: Any intermediate form of a join is always a stream. Eg: For the second level join of
+    // stream-table-table join, the left side of the join is join output, which we always
+    // assume to be a stream. The intermediate stream won't be an instance of TableScan.
+    // The join key(s) for the table could be an udf in which case the relNode would be LogicalProject.
+
+    // If the relNode is a vertex in a DAG, get the real relNode. This happens due to query optimization.
+    if (relNode instanceof HepRelVertex) {
+      relNode = ((HepRelVertex) relNode).getCurrentRel();
+    }
+
+    if (relNode instanceof TableScan || relNode instanceof LogicalProject || relNode instanceof LogicalFilter) {
+      SqlIOConfig sourceTableConfig = JoinTranslator.resolveSQlIOForTable(relNode, systemStreamConfigBySource);
+      if (sourceTableConfig == null || !sourceTableConfig.getTableDescriptor().isPresent()) {
+        return JoinInputNode.InputType.STREAM;
+      } else if (sourceTableConfig.getTableDescriptor().get() instanceof RemoteTableDescriptor ||
+          sourceTableConfig.getTableDescriptor().get() instanceof CachingTableDescriptor) {
+        return JoinInputNode.InputType.REMOTE_TABLE;
+      } else {
+        return JoinInputNode.InputType.LOCAL_TABLE;
+      }
+    } else {
+      return JoinInputNode.InputType.STREAM;
+    }
+  }
+
 }
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index b05c468..02c434d 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -26,7 +26,9 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.stream.Collectors;
+import java.util.Map;
 import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.hep.HepRelVertex;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.TableScan;
@@ -54,8 +56,6 @@ import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
 import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory;
 import org.apache.samza.table.Table;
-import org.apache.samza.table.descriptors.CachingTableDescriptor;
-import org.apache.samza.table.descriptors.RemoteTableDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -94,8 +94,10 @@ class JoinTranslator {
   }
 
   void translate(final LogicalJoin join, final TranslatorContext translatorContext) {
-    JoinInputNode.InputType inputTypeOnLeft = getInputType(join.getLeft(), translatorContext);
-    JoinInputNode.InputType inputTypeOnRight = getInputType(join.getRight(), translatorContext);
+    JoinInputNode.InputType inputTypeOnLeft = JoinInputNode.getInputType(join.getLeft(),
+        translatorContext.getExecutionContext().getSamzaSqlApplicationConfig().getInputSystemStreamConfigBySource());
+    JoinInputNode.InputType inputTypeOnRight = JoinInputNode.getInputType(join.getRight(),
+        translatorContext.getExecutionContext().getSamzaSqlApplicationConfig().getInputSystemStreamConfigBySource());
 
     // Do the validation of join query
     validateJoinQuery(join, inputTypeOnLeft, inputTypeOnRight);
@@ -358,14 +360,19 @@ class JoinTranslator {
         SqlExplainLevel.EXPPLAN_ATTRIBUTES);
   }
 
-  private SqlIOConfig resolveSQlIOForTable(RelNode relNode, TranslatorContext context) {
+  static SqlIOConfig resolveSQlIOForTable(RelNode relNode, Map<String, SqlIOConfig> systemStreamConfigBySource) {
     // Let's recursively get to the TableScan node to identify IO for the table.
+
+    if (relNode instanceof HepRelVertex) {
+      return resolveSQlIOForTable(((HepRelVertex) relNode).getCurrentRel(), systemStreamConfigBySource);
+    }
+
     if (relNode instanceof LogicalProject) {
-      return resolveSQlIOForTable(((LogicalProject) relNode).getInput(), context);
+      return resolveSQlIOForTable(((LogicalProject) relNode).getInput(), systemStreamConfigBySource);
     }
 
     if (relNode instanceof LogicalFilter) {
-      return resolveSQlIOForTable(((LogicalFilter) relNode).getInput(), context);
+      return resolveSQlIOForTable(((LogicalFilter) relNode).getInput(), systemStreamConfigBySource);
     }
 
     // We return null for table IO as the table seems to be involved in another join. The output of stream-table join
@@ -380,39 +387,17 @@ class JoinTranslator {
     }
 
     String sourceName = SqlIOConfig.getSourceFromSourceParts(relNode.getTable().getQualifiedName());
-    SqlIOConfig sourceConfig =
-        context.getExecutionContext().getSamzaSqlApplicationConfig().getInputSystemStreamConfigBySource().get(sourceName);
+    SqlIOConfig sourceConfig = systemStreamConfigBySource.get(sourceName);
     if (sourceConfig == null) {
       throw new SamzaException("Unsupported source found in join statement: " + sourceName);
     }
     return sourceConfig;
   }
 
-  private JoinInputNode.InputType getInputType(RelNode relNode, TranslatorContext context) {
-
-    // NOTE: Any intermediate form of a join is always a stream. Eg: For the second level join of
-    // stream-table-table join, the left side of the join is join output, which we always
-    // assume to be a stream. The intermediate stream won't be an instance of TableScan.
-    // The join key(s) for the table could be an udf in which case the relNode would be LogicalProject.
-
-    if (relNode instanceof TableScan || relNode instanceof LogicalProject) {
-      SqlIOConfig sourceTableConfig = resolveSQlIOForTable(relNode, context);
-      if (sourceTableConfig == null || !sourceTableConfig.getTableDescriptor().isPresent()) {
-        return JoinInputNode.InputType.STREAM;
-      } else if (sourceTableConfig.getTableDescriptor().get() instanceof RemoteTableDescriptor ||
-          sourceTableConfig.getTableDescriptor().get() instanceof CachingTableDescriptor) {
-        return JoinInputNode.InputType.REMOTE_TABLE;
-      } else {
-        return JoinInputNode.InputType.LOCAL_TABLE;
-      }
-    } else {
-      return JoinInputNode.InputType.STREAM;
-    }
-  }
-
   private Table getTable(JoinInputNode tableNode, TranslatorContext context) {
 
-    SqlIOConfig sourceTableConfig = resolveSQlIOForTable(tableNode.getRelNode(), context);
+    SqlIOConfig sourceTableConfig = resolveSQlIOForTable(tableNode.getRelNode(),
+        context.getExecutionContext().getSamzaSqlApplicationConfig().getInputSystemStreamConfigBySource());
 
     if (sourceTableConfig == null || !sourceTableConfig.getTableDescriptor().isPresent()) {
       String errMsg = "Failed to resolve table source in join operation: node=" + tableNode.getRelNode();
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index 8247921..405fb2c 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -192,7 +192,7 @@ public class QueryTranslator {
   void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamApplicationDescriptor appDesc, int queryId) {
     QueryPlanner planner =
         new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(),
-            sqlConfig.getUdfMetadata());
+            sqlConfig.getUdfMetadata(), sqlConfig.isQueryPlanOptimizerEnabled());
     final RelRoot relRoot = planner.plan(queryInfo.getSelectQuery());
     SamzaSqlExecutionContext executionContext = new SamzaSqlExecutionContext(sqlConfig);
     TranslatorContext translatorContext = new TranslatorContext(appDesc, relRoot, executionContext);
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java b/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
index f015d2a..4a6390b 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
@@ -165,6 +165,7 @@ public class SamzaSqlQueryParser {
         .traitDefs(traitDefs)
         .context(Contexts.EMPTY_CONTEXT)
         .costFactory(null)
+        //.programs(Programs.CALC_PROGRAM)
         .build();
     return Frameworks.getPlanner(frameworkConfig);
   }
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/planner/TestQueryPlanner.java b/samza-sql/src/test/java/org/apache/samza/sql/planner/TestQueryPlanner.java
new file mode 100644
index 0000000..2a9dc13
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/planner/TestQueryPlanner.java
@@ -0,0 +1,345 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.planner;
+
+import java.util.Collection;
+import java.util.Map;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.sql.dsl.SamzaSqlDslConverterFactory;
+import org.apache.samza.sql.interfaces.DslConverter;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
+import org.apache.samza.sql.util.SamzaSqlTestConfig;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class TestQueryPlanner {
+
+  @Test
+  public void testTranslate() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
+    String sql =
+        "Insert into testavro.outputTopic(id) select MyTest(id) from testavro.level1.level2.SIMPLE1 as s where s.id = 10";
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+
+    DslConverter dslConverter = new SamzaSqlDslConverterFactory().create(samzaConfig);
+    Collection<RelRoot> relRoots = dslConverter.convertDsl(sql);
+    assertEquals(1, relRoots.size());
+  }
+
+  @Test
+  public void testRemoteJoinWithFilter() throws SamzaSqlValidatorException {
+    testRemoteJoinWithFilterHelper(false);
+  }
+
+  @Test
+  public void testRemoteJoinWithUdfAndFilter() throws SamzaSqlValidatorException {
+    testRemoteJoinWithUdfAndFilterHelper(false);
+  }
+
+  @Test
+  public void testRemoteJoinWithFilterAndOptimizer() throws SamzaSqlValidatorException {
+    testRemoteJoinWithFilterHelper(true);
+  }
+
+  @Test
+  public void testRemoteJoinWithUdfAndFilterAndOptimizer() throws SamzaSqlValidatorException {
+    testRemoteJoinWithUdfAndFilterHelper(true);
+  }
+
+  void testRemoteJoinWithFilterHelper(boolean enableOptimizer) throws SamzaSqlValidatorException {
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic "
+            + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
+            + "       p.name as profileName, p.address as profileAddress "
+            + "from testavro.PAGEVIEW as pv "
+            + "join testRemoteStore.Profile.`$table` as p "
+            + " on p.__key__ = pv.profileId"
+            + " where p.name = pv.pageKey AND p.name = 'Mike' AND pv.profileId = 1";
+
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(enableOptimizer));
+
+    Config samzaConfig = new MapConfig(staticConfigs);
+    DslConverter dslConverter = new SamzaSqlDslConverterFactory().create(samzaConfig);
+    Collection<RelRoot> relRoots = dslConverter.convertDsl(sql);
+
+    /*
+      Query plan without optimization:
+      LogicalProject(__key__=[$1], pageKey=[$1], companyName=['N/A'], profileName=[$5], profileAddress=[$7])
+        LogicalFilter(condition=[AND(=($5, $1), =($5, 'Mike'), =($2, 1))])
+          LogicalJoin(condition=[=($3, $2)], joinType=[inner])
+            LogicalTableScan(table=[[testavro, PAGEVIEW]])
+            LogicalTableScan(table=[[testRemoteStore, Profile, $table]])
+
+      Query plan with optimization:
+      LogicalProject(__key__=[$1], pageKey=[$1], companyName=['N/A'], profileName=[$5], profileAddress=[$7])
+        LogicalFilter(condition=[AND(=($5, $1), =($5, 'Mike'))])
+          LogicalJoin(condition=[=($3, $2)], joinType=[inner])
+            LogicalFilter(condition=[=($2, 1)])
+              LogicalTableScan(table=[[testavro, PAGEVIEW]])
+            LogicalTableScan(table=[[testRemoteStore, Profile, $table]])
+     */
+
+    assertEquals(1, relRoots.size());
+    RelRoot relRoot = relRoots.iterator().next();
+    RelNode relNode = relRoot.rel;
+    assertTrue(relNode instanceof LogicalProject);
+    relNode = relNode.getInput(0);
+    assertTrue(relNode instanceof LogicalFilter);
+    if (enableOptimizer) {
+      assertEquals("AND(=($1, $5), =($5, 'Mike'))", ((LogicalFilter) relNode).getCondition().toString());
+    } else {
+      assertEquals("AND(=(1, $2), =($1, $5), =($5, 'Mike'))", ((LogicalFilter) relNode).getCondition().toString());
+    }
+    relNode = relNode.getInput(0);
+    assertTrue(relNode instanceof LogicalJoin);
+    assertEquals(2, relNode.getInputs().size());
+    LogicalJoin join = (LogicalJoin) relNode;
+    RelNode left = join.getLeft();
+    RelNode right = join.getRight();
+    assertTrue(right instanceof LogicalTableScan);
+    if (enableOptimizer) {
+      assertTrue(left instanceof LogicalFilter);
+      assertEquals("=(1, $2)", ((LogicalFilter) left).getCondition().toString());
+      assertTrue(left.getInput(0) instanceof LogicalTableScan);
+    } else {
+      assertTrue(left instanceof LogicalTableScan);
+    }
+  }
+
+  void testRemoteJoinWithUdfAndFilterHelper(boolean enableOptimizer) throws SamzaSqlValidatorException {
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic "
+            + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
+            + "       p.name as profileName, p.address as profileAddress "
+            + "from testRemoteStore.Profile.`$table` as p "
+            + "join testavro.PAGEVIEW as pv "
+            + " on p.__key__ = BuildOutputRecord('id', pv.profileId)"
+            + " where p.name = 'Mike' and pv.profileId = 1 and p.name = pv.pageKey";
+
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(enableOptimizer));
+
+    Config samzaConfig = new MapConfig(staticConfigs);
+    DslConverter dslConverter = new SamzaSqlDslConverterFactory().create(samzaConfig);
+    Collection<RelRoot> relRoots = dslConverter.convertDsl(sql);
+
+    /*
+      Query plan without optimization:
+      LogicalProject(__key__=[$9], pageKey=[$9], companyName=['N/A'], profileName=[$2], profileAddress=[$4])
+        LogicalFilter(condition=[AND(=($2, 'Mike'), =($10, 1), =($2, $9))])  ==> Only the second condition could be pushed down.
+          LogicalProject(__key__=[$0], id=[$1], name=[$2], companyId=[$3], address=[$4], selfEmployed=[$5],
+                                  phoneNumbers=[$6], mapValues=[$7], __key__0=[$8], pageKey=[$9], profileId=[$10])
+                                  ==> ProjectMergeRule removes this redundant node.
+            LogicalJoin(condition=[=($0, $11)], joinType=[inner])
+              LogicalTableScan(table=[[testRemoteStore, Profile, $table]])
+              LogicalProject(__key__=[$0], pageKey=[$1], profileId=[$2], $f3=[BuildOutputRecord('id', $2)])  ==> Filter is pushed above project.
+                LogicalTableScan(table=[[testavro, PAGEVIEW]])
+
+      Query plan with optimization:
+      LogicalProject(__key__=[$9], pageKey=[$9], companyName=['N/A'], profileName=[$2], profileAddress=[$4])
+          LogicalFilter(condition=[AND(=($2, 'Mike'), =($2, $9))])
+            LogicalJoin(condition=[=($0, $11)], joinType=[inner])
+              LogicalTableScan(table=[[testRemoteStore, Profile, $table]])
+              LogicalFilter(condition=[=($2, 1)])
+                LogicalProject(__key__=[$0], pageKey=[$1], profileId=[$2], $f3=[BuildOutputRecord('id', $2)])
+                  LogicalTableScan(table=[[testavro, PAGEVIEW]])
+     */
+
+    assertEquals(1, relRoots.size());
+    RelRoot relRoot = relRoots.iterator().next();
+    RelNode relNode = relRoot.rel;
+    assertTrue(relNode instanceof LogicalProject);
+    relNode = relNode.getInput(0);
+    assertTrue(relNode instanceof LogicalFilter);
+    if (enableOptimizer) {
+      assertEquals("AND(=($2, $9), =($2, 'Mike'))", ((LogicalFilter) relNode).getCondition().toString());
+    } else {
+      assertEquals("AND(=($2, $9), =(1, $10), =($2, 'Mike'))", ((LogicalFilter) relNode).getCondition().toString());
+    }
+    relNode = relNode.getInput(0);
+    if (enableOptimizer) {
+      assertTrue(relNode instanceof LogicalJoin);
+      assertEquals(2, relNode.getInputs().size());
+    } else {
+      assertTrue(relNode instanceof LogicalProject);
+      relNode = relNode.getInput(0);
+    }
+    LogicalJoin join = (LogicalJoin) relNode;
+    RelNode left = join.getLeft();
+    RelNode right = join.getRight();
+    assertTrue(left instanceof LogicalTableScan);
+    if (enableOptimizer) {
+      assertTrue(right instanceof LogicalFilter);
+      assertEquals("=(1, $2)", ((LogicalFilter) right).getCondition().toString());
+      relNode = right.getInput(0);
+    } else {
+      relNode = right;
+    }
+    assertTrue(relNode instanceof LogicalProject);
+    relNode = relNode.getInput(0);
+    assertTrue(relNode instanceof LogicalTableScan);
+  }
+
+  @Test
+  public void testLocalStreamTableInnerJoinFilterOptimization() throws Exception {
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic "
+            + "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as companyName, p.name as profileName,"
+            + "       p.address as profileAddress "
+            + "from testavro.PROFILE.`$table` as p "
+            + "join testavro.PAGEVIEW as pv "
+            + " on p.id = pv.profileId "
+            + "where p.name = 'Mike'";
+
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(true));
+
+    Config samzaConfig = new MapConfig(staticConfigs);
+    DslConverter dslConverter = new SamzaSqlDslConverterFactory().create(samzaConfig);
+    Collection<RelRoot> relRootsWithOptimization = dslConverter.convertDsl(sql);
+
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(false));
+
+    samzaConfig = new MapConfig(staticConfigs);
+    dslConverter = new SamzaSqlDslConverterFactory().create(samzaConfig);
+    Collection<RelRoot> relRootsWithoutOptimization = dslConverter.convertDsl(sql);
+
+    // We do not yet have any join filter optimizations for local joins. Hence the plans with and without optimization
+    // should be the same.
+    assertEquals(RelOptUtil.toString(relRootsWithOptimization.iterator().next().rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES),
+        RelOptUtil.toString(relRootsWithoutOptimization.iterator().next().rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES));
+  }
+
+  @Test
+  public void testRemoteJoinFilterPushDownWithUdfInFilterAndOptimizer() throws SamzaSqlValidatorException {
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic "
+            + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
+            + "       p.name as profileName, p.address as profileAddress "
+            + "from testRemoteStore.Profile.`$table` as p "
+            + "join testavro.PAGEVIEW as pv "
+            + " on p.__key__ = pv.profileId"
+            + " where p.name = pv.pageKey AND p.name = 'Mike' AND pv.profileId = MyTest(pv.profileId)";
+
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(true));
+
+    Config samzaConfig = new MapConfig(staticConfigs);
+    DslConverter dslConverter = new SamzaSqlDslConverterFactory().create(samzaConfig);
+    Collection<RelRoot> relRoots = dslConverter.convertDsl(sql);
+
+    /*
+      Query plan without optimization:
+      LogicalProject(__key__=[$9], pageKey=[$9], companyName=['N/A'], profileName=[$2], profileAddress=[$4])
+        LogicalFilter(condition=[AND(=($2, $9), =($2, 'Mike'), =($10, CAST(MyTest($10)):INTEGER))])
+          LogicalJoin(condition=[=($0, $10)], joinType=[inner])
+            LogicalTableScan(table=[[testRemoteStore, Profile, $table]])
+            LogicalTableScan(table=[[testavro, PAGEVIEW]])
+
+      Query plan with optimization:
+      LogicalProject(__key__=[$9], pageKey=[$9], companyName=['N/A'], profileName=[$2], profileAddress=[$4])
+        LogicalFilter(condition=[AND(=($2, $9), =($2, 'Mike'))])
+          LogicalJoin(condition=[=($0, $10)], joinType=[inner])
+            LogicalTableScan(table=[[testRemoteStore, Profile, $table]])
+            LogicalFilter(condition=[=($2, CAST(MyTest($2)):INTEGER)])
+              LogicalTableScan(table=[[testavro, PAGEVIEW]])
+     */
+
+    assertEquals(1, relRoots.size());
+    RelRoot relRoot = relRoots.iterator().next();
+    RelNode relNode = relRoot.rel;
+    assertTrue(relNode instanceof LogicalProject);
+    relNode = relNode.getInput(0);
+    assertTrue(relNode instanceof LogicalFilter);
+    assertEquals("AND(=($2, $9), =($2, 'Mike'))", ((LogicalFilter) relNode).getCondition().toString());
+    relNode = relNode.getInput(0);
+    assertTrue(relNode instanceof LogicalJoin);
+    assertEquals(2, relNode.getInputs().size());
+    LogicalJoin join = (LogicalJoin) relNode;
+    RelNode left = join.getLeft();
+    RelNode right = join.getRight();
+    assertTrue(left instanceof LogicalTableScan);
+    assertTrue(right instanceof LogicalFilter);
+    assertEquals("=($2, CAST(MyTest($2)):INTEGER)", ((LogicalFilter) right).getCondition().toString());
+    assertTrue(right.getInput(0) instanceof LogicalTableScan);
+  }
+
+  @Test
+  public void testRemoteJoinNoFilterPushDownWithUdfInFilterAndOptimizer() throws SamzaSqlValidatorException {
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic "
+            + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
+            + "       p.name as profileName, p.address as profileAddress "
+            + "from testRemoteStore.Profile.`$table` as p "
+            + "join testavro.PAGEVIEW as pv "
+            + " on p.__key__ = pv.profileId"
+            + " where p.name = pv.pageKey AND p.name = 'Mike' AND pv.profileId = MyTestPoly(p.name)";
+
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(true));
+
+    Config samzaConfig = new MapConfig(staticConfigs);
+    DslConverter dslConverter = new SamzaSqlDslConverterFactory().create(samzaConfig);
+    Collection<RelRoot> relRootsWithOptimization = dslConverter.convertDsl(sql);
+
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(false));
+
+    samzaConfig = new MapConfig(staticConfigs);
+    dslConverter = new SamzaSqlDslConverterFactory().create(samzaConfig);
+    Collection<RelRoot> relRootsWithoutOptimization = dslConverter.convertDsl(sql);
+
+    /*
+      LogicalProject(__key__=[$9], pageKey=[$9], companyName=['N/A'], profileName=[$2], profileAddress=[$4])
+        LogicalFilter(condition=[AND(=($2, $9), =($2, 'Mike'), =($10, CAST(MyTestPoly($10)):INTEGER))])
+          LogicalJoin(condition=[=($0, $10)], joinType=[inner])
+            LogicalTableScan(table=[[testRemoteStore, Profile, $table]])
+            LogicalTableScan(table=[[testavro, PAGEVIEW]])
+     */
+
+    // None of the conditions in the filter could be pushed down as they all require a remote call. Hence the plans
+    // with and without optimization should be the same.
+    assertEquals(RelOptUtil.toString(relRootsWithOptimization.iterator().next().rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES),
+        RelOptUtil.toString(relRootsWithoutOptimization.iterator().next().rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES));
+  }
+}
+
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
index b5ebbbd..c41038d 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
@@ -105,7 +105,26 @@ public class TestSamzaSqlRemoteTable extends SamzaSqlIntegrationTestHarness {
   }
 
   @Test
-  public void testSourceEndToEndWithKey() throws SamzaSqlValidatorException {
+  public void testJoinEndToEnd() throws SamzaSqlValidatorException {
+    testJoinEndToEndHelper(false);
+  }
+
+  @Test
+  public void testJoinEndToEndWithUdf() throws SamzaSqlValidatorException {
+    testJoinEndToEndWithUdfHelper(false);
+  }
+
+  @Test
+  public void testJoinEndToEndWithOptimizer() throws SamzaSqlValidatorException {
+    testJoinEndToEndHelper(true);
+  }
+
+  @Test
+  public void testJoinEndToEndWithUdfAndOptimizer() throws SamzaSqlValidatorException {
+    testJoinEndToEndWithUdfHelper(true);
+  }
+
+  void testJoinEndToEndHelper(boolean enableOptimizer) throws SamzaSqlValidatorException {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
@@ -123,6 +142,7 @@ public class TestSamzaSqlRemoteTable extends SamzaSqlIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(enableOptimizer));
 
     Config config = new MapConfig(staticConfigs);
     new SamzaSqlValidator(config).validate(sqlStmts);
@@ -139,8 +159,7 @@ public class TestSamzaSqlRemoteTable extends SamzaSqlIntegrationTestHarness {
     Assert.assertEquals(expectedOutMessages, outMessages);
   }
 
-  @Test
-  public void testSourceEndToEndWithKeyAndUdf() throws SamzaSqlValidatorException {
+  void testJoinEndToEndWithUdfHelper(boolean enableOptimizer) throws SamzaSqlValidatorException {
     int numMessages = 20;
 
     TestAvroSystemFactory.messages.clear();
@@ -158,6 +177,7 @@ public class TestSamzaSqlRemoteTable extends SamzaSqlIntegrationTestHarness {
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(enableOptimizer));
 
     Config config = new MapConfig(staticConfigs);
     new SamzaSqlValidator(config).validate(sqlStmts);
@@ -175,6 +195,96 @@ public class TestSamzaSqlRemoteTable extends SamzaSqlIntegrationTestHarness {
   }
 
   @Test
+  public void testJoinEndToEndWithFilter() throws SamzaSqlValidatorException {
+    testJoinEndToEndWithFilterHelper(false);
+  }
+
+  @Test
+  public void testJoinEndToEndWithUdfAndFilter() throws SamzaSqlValidatorException {
+    testJoinEndToEndWithUdfAndFilterHelper(false);
+  }
+
+  @Test
+  public void testJoinEndToEndWithFilterAndOptimizer() throws SamzaSqlValidatorException {
+    testJoinEndToEndWithFilterHelper(true);
+  }
+
+  @Test
+  public void testJoinEndToEndWithUdfAndFilterAndOptimizer() throws SamzaSqlValidatorException {
+    testJoinEndToEndWithUdfAndFilterHelper(true);
+  }
+
+  void testJoinEndToEndWithFilterHelper(boolean enableOptimizer) throws SamzaSqlValidatorException {
+    int numMessages = 20;
+
+    TestAvroSystemFactory.messages.clear();
+    RemoteStoreIOResolverTestFactory.records.clear();
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+    populateProfileTable(staticConfigs, numMessages);
+
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic "
+            + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
+            + "       p.name as profileName, p.address as profileAddress "
+            + "from testRemoteStore.Profile.`$table` as p "
+            + "join testavro.PAGEVIEW as pv "
+            + " on p.__key__ = pv.profileId"
+            + " where p.name = 'Mike' and pv.profileId = 1";
+
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(enableOptimizer));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
+
+    List<String> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + ","
+            + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" :
+            ((GenericRecord) x.getMessage()).get("profileName").toString()))
+        .collect(Collectors.toList());
+    Assert.assertEquals(1, outMessages.size());
+    Assert.assertEquals(outMessages.get(0), "home,Mike");
+  }
+
+  void testJoinEndToEndWithUdfAndFilterHelper(boolean enableOptimizer) throws SamzaSqlValidatorException {
+    int numMessages = 20;
+
+    TestAvroSystemFactory.messages.clear();
+    RemoteStoreIOResolverTestFactory.records.clear();
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+    populateProfileTable(staticConfigs, numMessages);
+
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic "
+            + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
+            + "       p.name as profileName, p.address as profileAddress "
+            + "from testRemoteStore.Profile.`$table` as p "
+            + "join testavro.PAGEVIEW as pv "
+            + " on p.__key__ = BuildOutputRecord('id', pv.profileId)"
+            + " where p.name = 'Mike' and pv.profileId = 1";
+
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(enableOptimizer));
+
+    Config config = new MapConfig(staticConfigs);
+    new SamzaSqlValidator(config).validate(sqlStmts);
+
+    runApplication(config);
+
+    List<String> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + ","
+            + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" :
+            ((GenericRecord) x.getMessage()).get("profileName").toString()))
+        .collect(Collectors.toList());
+    Assert.assertEquals(1, outMessages.size());
+    Assert.assertEquals(outMessages.get(0), "home,Mike");
+  }
+
+  @Test
   public void testSourceEndToEndWithKeyWithNullForeignKeys() throws SamzaSqlValidatorException {
     int numMessages = 20;
 
@@ -352,7 +462,7 @@ public class TestSamzaSqlRemoteTable extends SamzaSqlIntegrationTestHarness {
             + "select p.__key__ as __key__, 'UPDATE' as __op__ "
             + "from testRemoteStore.Profile.`$table` as p "
             + "join testavro.PAGEVIEW as pv "
-            + " on p.__key__ = pv.profileId ";
+            + " on p.__key__ = pv.profileId";
 
     List<String> sqlStmts = Arrays.asList(sql);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));