You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by at...@apache.org on 2020/07/20 06:03:08 UTC
[samza] branch master updated: SAMZA-2549 - Samza-sql: Add query
optimizations for remote table joins (#1384)
This is an automated email from the ASF dual-hosted git repository.
atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new f665824 SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins (#1384)
f665824 is described below
commit f665824c1043bc3088bb629cedcabb37e8e91b4d
Author: Aditya Toomula <at...@linkedin.com>
AuthorDate: Sun Jul 19 23:03:00 2020 -0700
SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins (#1384)
* SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins
* SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins
* SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins
* SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins
* SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins
* SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins
* SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins
* SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins
* SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins
* SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins
* Fix checkstyle errors
* Fix checkstyle errors
* Fix checkstyle errors
* SAMZA-2549 - Samza-sql: Add query optimizations for remote table joins
---
.../apache/samza/sql/dsl/SamzaSqlDslConverter.java | 4 +-
.../org/apache/samza/sql/planner/QueryPlanner.java | 70 ++++-
.../sql/planner/SamzaSqlFilterRemoteJoinRule.java | 261 ++++++++++++++++
.../samza/sql/planner/SamzaSqlValidator.java | 7 +-
.../sql/runner/SamzaSqlApplicationConfig.java | 7 +
.../apache/samza/sql/translator/JoinInputNode.java | 40 ++-
.../samza/sql/translator/JoinTranslator.java | 49 +--
.../samza/sql/translator/QueryTranslator.java | 2 +-
.../apache/samza/sql/util/SamzaSqlQueryParser.java | 1 +
.../apache/samza/sql/planner/TestQueryPlanner.java | 345 +++++++++++++++++++++
.../test/samzasql/TestSamzaSqlRemoteTable.java | 118 ++++++-
11 files changed, 851 insertions(+), 53 deletions(-)
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
index b09d3d6..acc5b42 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
@@ -52,9 +52,9 @@ public class SamzaSqlDslConverter implements DslConverter {
public Collection<RelRoot> convertDsl(String dsl) {
// TODO: Introduce an API to parse a dsl string and return one or more sql statements
List<String> sqlStmts = fetchSqlFromConfig(config);
- QueryPlanner planner = getQueryPlanner(getSqlConfig(sqlStmts, config));
List<RelRoot> relRoots = new LinkedList<>();
for (String sql: sqlStmts) {
+ QueryPlanner planner = getQueryPlanner(getSqlConfig(Collections.singletonList(sql), config));
// we always pass only select query to the planner for samza sql. The reason is that samza sql supports
// schema evolution where source and destination could up to an extent have independent schema evolution while
// calcite expects strict comformance of the destination schema with that of the fields in the select query.
@@ -87,7 +87,7 @@ public class SamzaSqlDslConverter implements DslConverter {
*/
public static QueryPlanner getQueryPlanner(SamzaSqlApplicationConfig sqlConfig) {
return new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(),
- sqlConfig.getUdfMetadata());
+ sqlConfig.getUdfMetadata(), sqlConfig.isQueryPlanOptimizerEnabled());
}
/**
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
index 8990bf1..767df43 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/QueryPlanner.java
@@ -19,6 +19,7 @@
package org.apache.samza.sql.planner;
+import com.google.common.collect.ImmutableList;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.ArrayList;
@@ -28,12 +29,17 @@ import java.util.Map;
import java.util.stream.Collectors;
import org.apache.calcite.config.Lex;
import org.apache.calcite.jdbc.CalciteConnection;
-import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.metadata.DefaultRelMetadataProvider;
+import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
+import org.apache.calcite.rel.rules.ProjectMergeRule;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.SchemaPlus;
@@ -41,6 +47,7 @@ import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.parser.SqlParser;
@@ -50,6 +57,7 @@ import org.apache.calcite.sql2rel.SqlToRelConverter;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.Planner;
+import org.apache.calcite.tools.Programs;
import org.apache.samza.SamzaException;
import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.RelSchemaProvider;
@@ -76,11 +84,15 @@ public class QueryPlanner {
// Mapping between the source to the SqlIOConfig corresponding to the source.
private final Map<String, SqlIOConfig> systemStreamConfigBySource;
+ private final boolean isQueryPlanOptimizerEnabled;
+
public QueryPlanner(Map<String, RelSchemaProvider> relSchemaProviders,
- Map<String, SqlIOConfig> systemStreamConfigBySource, Collection<UdfMetadata> udfMetadata) {
+ Map<String, SqlIOConfig> systemStreamConfigBySource, Collection<UdfMetadata> udfMetadata,
+ boolean isQueryPlanOptimizerEnabled) {
this.relSchemaProviders = relSchemaProviders;
this.systemStreamConfigBySource = systemStreamConfigBySource;
this.udfMetadata = udfMetadata;
+ this.isQueryPlanOptimizerEnabled = isQueryPlanOptimizerEnabled;
}
private void registerSourceSchemas(SchemaPlus rootSchema) {
@@ -109,7 +121,8 @@ public class QueryPlanner {
}
}
- public RelRoot plan(String query) {
+ private Planner getPlanner() {
+ Planner planner = null;
try {
Connection connection = DriverManager.getConnection("jdbc:calcite:");
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
@@ -117,7 +130,7 @@ public class QueryPlanner {
registerSourceSchemas(rootSchema);
List<SamzaSqlScalarFunctionImpl> samzaSqlFunctions = udfMetadata.stream()
- .map(x -> new SamzaSqlScalarFunctionImpl(x))
+ .map(SamzaSqlScalarFunctionImpl::new)
.collect(Collectors.toList());
final List<RelTraitDef> traitDefs = new ArrayList<>();
@@ -129,6 +142,13 @@ public class QueryPlanner {
sqlOperatorTables.add(new SamzaSqlOperatorTable());
sqlOperatorTables.add(new SamzaSqlUdfOperatorTable(samzaSqlFunctions));
+ // TODO: Introduce a pluggable rule factory.
+ List<RelOptRule> rules = ImmutableList.of(
+ FilterProjectTransposeRule.INSTANCE,
+ ProjectMergeRule.INSTANCE,
+ new SamzaSqlFilterRemoteJoinRule.SamzaSqlFilterIntoRemoteJoinRule(true, RelFactories.LOGICAL_BUILDER,
+ systemStreamConfigBySource));
+
// Using lenient so that !=,%,- are allowed.
FrameworkConfig frameworkConfig = Frameworks.newConfigBuilder()
.parserConfig(SqlParser.configBuilder()
@@ -140,16 +160,48 @@ public class QueryPlanner {
.operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables))
.sqlToRelConverterConfig(SqlToRelConverter.Config.DEFAULT)
.traitDefs(traitDefs)
- .context(Contexts.EMPTY_CONTEXT)
- .costFactory(null)
+ .programs(Programs.hep(rules, true, DefaultRelMetadataProvider.INSTANCE))
.build();
- Planner planner = Frameworks.getPlanner(frameworkConfig);
+ planner = Frameworks.getPlanner(frameworkConfig);
+ return planner;
+ } catch (Exception e) {
+ String errorMsg = "Failed to create planner.";
+ LOG.error(errorMsg, e);
+ if (planner != null) {
+ planner.close();
+ }
+ throw new SamzaException(errorMsg, e);
+ }
+ }
+ private RelRoot optimize(Planner planner, RelRoot relRoot) {
+ RelTraitSet relTraitSet = RelTraitSet.createEmpty();
+ try {
+ RelRoot optimizedRelRoot =
+ RelRoot.of(planner.transform(0, relTraitSet, relRoot.project()), SqlKind.SELECT);
+ LOG.info("query plan with optimization:\n"
+ + RelOptUtil.toString(optimizedRelRoot.rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES));
+ return optimizedRelRoot;
+ } catch (Exception e) {
+ String errorMsg =
+ "Error while optimizing query plan:\n" + RelOptUtil.toString(relRoot.rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES);
+ LOG.error(errorMsg, e);
+ throw new SamzaException(errorMsg, e);
+ }
+ }
+
+ public RelRoot plan(String query) {
+ try (Planner planner = getPlanner()) {
SqlNode sql = planner.parse(query);
SqlNode validatedSql = planner.validate(sql);
RelRoot relRoot = planner.rel(validatedSql);
- LOG.info("query plan:\n" + RelOptUtil.toString(relRoot.rel, SqlExplainLevel.ALL_ATTRIBUTES));
- return relRoot;
+ LOG.info(
+ "query plan without optimization:\n" + RelOptUtil.toString(relRoot.rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES));
+ if (!isQueryPlanOptimizerEnabled) {
+ LOG.info("Skipping query optimization as it is disabled.");
+ return relRoot;
+ }
+ return optimize(planner, relRoot);
} catch (Exception e) {
String errorMsg = SamzaSqlValidator.formatErrorString(query, e);
LOG.error(errorMsg, e);
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlFilterRemoteJoinRule.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlFilterRemoteJoinRule.java
new file mode 100644
index 0000000..fc84f65
--- /dev/null
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlFilterRemoteJoinRule.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.planner;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Map;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.translator.JoinInputNode;
+import org.apache.samza.sql.translator.JoinInputNode.InputType;
+
+/**
+ * Planner rule for remote table joins that pushes filters above and
+ * within a join node into its children nodes.
+ * This class is customized form of Calcite's {@link org.apache.calcite.rel.rules.FilterJoinRule} for
+ * remote table joins.
+ */
+public abstract class SamzaSqlFilterRemoteJoinRule extends RelOptRule {
+ /** Whether to try to strengthen join-type. */
+ private final boolean smart;
+
+ Map<String, SqlIOConfig> systemStreamConfigBySource;
+
+ //~ Constructors -----------------------------------------------------------
+
+ /**
+ * Creates a SamzaSqlFilterRemoteJoinRule with an explicit root operand and
+ * factories.
+ */
+ protected SamzaSqlFilterRemoteJoinRule(RelOptRuleOperand operand, String id,
+ boolean smart, RelBuilderFactory relBuilderFactory, Map<String, SqlIOConfig> systemStreamConfigBySource) {
+ super(operand, relBuilderFactory, "SamzaSqlFilterRemoteJoinRule:" + id);
+ this.smart = smart;
+ this.systemStreamConfigBySource = systemStreamConfigBySource;
+ }
+
+ //~ Methods ----------------------------------------------------------------
+
+ protected void perform(RelOptRuleCall call, Filter filter,
+ Join join) {
+ final List<RexNode> joinFilters =
+ RelOptUtil.conjunctions(join.getCondition());
+
+ boolean donotOptimizeLeft = false;
+ boolean donotOptimizeRight = false;
+
+ JoinInputNode.InputType inputTypeOnLeft =
+ JoinInputNode.getInputType(join.getLeft(), systemStreamConfigBySource);
+ JoinInputNode.InputType inputTypeOnRight =
+ JoinInputNode.getInputType(join.getRight(), systemStreamConfigBySource);
+
+ // Disable this optimization for queries using local table.
+ if (inputTypeOnLeft == InputType.LOCAL_TABLE || inputTypeOnRight == InputType.LOCAL_TABLE) {
+ donotOptimizeLeft = true;
+ donotOptimizeRight = true;
+ }
+
+ // There is nothing to optimize on the remote table side as the lookup needs to happen first before filtering.
+ if (inputTypeOnLeft == InputType.REMOTE_TABLE) {
+ donotOptimizeLeft = true;
+ }
+ if (inputTypeOnRight == InputType.REMOTE_TABLE) {
+ donotOptimizeRight = true;
+ }
+
+ // If there is only the joinRel,
+ // make sure it does not match a cartesian product joinRel
+ // (with "true" condition), otherwise this rule will be applied
+ // again on the new cartesian product joinRel.
+ if (filter == null && joinFilters.isEmpty()) {
+ return;
+ }
+
+ final List<RexNode> aboveFilters =
+ filter != null
+ ? RelOptUtil.conjunctions(filter.getCondition())
+ : new ArrayList<>();
+ final ImmutableList<RexNode> origAboveFilters =
+ ImmutableList.copyOf(aboveFilters);
+
+ // Simplify Outer Joins
+ JoinRelType joinType = join.getJoinType();
+ if (smart
+ && !origAboveFilters.isEmpty()
+ && join.getJoinType() != JoinRelType.INNER) {
+ joinType = RelOptUtil.simplifyJoin(join, origAboveFilters, joinType);
+ }
+
+ final List<RexNode> leftFilters = new ArrayList<>();
+ final List<RexNode> rightFilters = new ArrayList<>();
+
+ // TODO - add logic to derive additional filters. E.g., from
+ // (t1.a = 1 AND t2.a = 2) OR (t1.b = 3 AND t2.b = 4), you can
+ // derive table filters:
+ // (t1.a = 1 OR t1.b = 3)
+ // (t2.a = 2 OR t2.b = 4)
+
+ // Try to push down above filters. These are typically where clause
+ // filters. They can be pushed down if they are not on the NULL
+ // generating side.
+ // We do not push into join condition as we do not benefit much. There is also correctness issue
+ // with remote table as we will not have values for the remote table before the join/lookup.
+ // leftFilters and rightFilters are populated in classifyFilters API.
+ boolean filterPushed = false;
+ if (RelOptUtil.classifyFilters(
+ join,
+ aboveFilters,
+ joinType,
+ false, // Let's not push into join filter
+ !joinType.generatesNullsOnLeft() && !donotOptimizeLeft,
+ !joinType.generatesNullsOnRight() && !donotOptimizeRight,
+ joinFilters,
+ leftFilters,
+ rightFilters)) {
+ filterPushed = true;
+ }
+
+ // If no filter got pushed after validate, reset filterPushed flag
+ if (leftFilters.isEmpty()
+ && rightFilters.isEmpty()) {
+ filterPushed = false;
+ }
+
+ boolean isAntiJoin = joinType == JoinRelType.ANTI;
+
+ // Try to push down filters in ON clause. A ON clause filter can only be
+ // pushed down if it does not affect the non-matching set, i.e. it is
+ // not on the side which is preserved.
+ // A ON clause filter of anti-join can not be pushed down.
+ if (!isAntiJoin && RelOptUtil.classifyFilters(
+ join,
+ joinFilters,
+ joinType,
+ false,
+ !joinType.generatesNullsOnLeft() && !donotOptimizeLeft,
+ !joinType.generatesNullsOnRight() && !donotOptimizeRight,
+ joinFilters,
+ leftFilters,
+ rightFilters)) {
+ filterPushed = true;
+ }
+
+ // if nothing actually got pushed and there is nothing leftover,
+ // then this rule is a no-op
+ if ((!filterPushed
+ && joinType == join.getJoinType())
+ || (joinFilters.isEmpty()
+ && leftFilters.isEmpty()
+ && rightFilters.isEmpty())) {
+ return;
+ }
+
+ // create Filters on top of the children if any filters were
+ // pushed to them
+ final RexBuilder rexBuilder = join.getCluster().getRexBuilder();
+ final RelBuilder relBuilder = call.builder();
+
+ final RelNode leftRel = relBuilder.push(join.getLeft()).filter(leftFilters).build();
+ final RelNode rightRel = relBuilder.push(join.getRight()).filter(rightFilters).build();
+
+ // create the new join node referencing the new children and
+ // containing its new join filters (if there are any)
+ final ImmutableList<RelDataType> fieldTypes =
+ ImmutableList.<RelDataType>builder()
+ .addAll(RelOptUtil.getFieldTypeList(leftRel.getRowType()))
+ .addAll(RelOptUtil.getFieldTypeList(rightRel.getRowType())).build();
+ final RexNode joinFilter =
+ RexUtil.composeConjunction(rexBuilder,
+ RexUtil.fixUp(rexBuilder, joinFilters, fieldTypes));
+
+ // If nothing actually got pushed and there is nothing leftover,
+ // then this rule is a no-op
+ if (joinFilter.isAlwaysTrue()
+ && leftFilters.isEmpty()
+ && rightFilters.isEmpty()
+ && joinType == join.getJoinType()) {
+ return;
+ }
+
+ RelNode newJoinRel =
+ join.copy(
+ join.getTraitSet(),
+ joinFilter,
+ leftRel,
+ rightRel,
+ joinType,
+ join.isSemiJoinDone());
+ call.getPlanner().onCopy(join, newJoinRel);
+ if (!leftFilters.isEmpty()) {
+ call.getPlanner().onCopy(filter, leftRel);
+ }
+ if (!rightFilters.isEmpty()) {
+ call.getPlanner().onCopy(filter, rightRel);
+ }
+
+ relBuilder.push(newJoinRel);
+
+ // Create a project on top of the join if some of the columns have become
+ // NOT NULL due to the join-type getting stricter.
+ relBuilder.convert(join.getRowType(), false);
+
+ // create a FilterRel on top of the join if needed
+ relBuilder.filter(
+ RexUtil.fixUp(rexBuilder, aboveFilters,
+ RelOptUtil.getFieldTypeList(relBuilder.peek().getRowType())));
+
+ call.transformTo(relBuilder.build());
+ }
+
+ /** Rule that tries to push the stream side of the filter expressions into the input of the join. */
+ public static class SamzaSqlFilterIntoRemoteJoinRule extends SamzaSqlFilterRemoteJoinRule {
+ public SamzaSqlFilterIntoRemoteJoinRule(boolean smart,
+ RelBuilderFactory relBuilderFactory, Map<String, SqlIOConfig> systemStreamConfigBySource) {
+ super(
+ operand(Filter.class,
+ operand(Join.class, RelOptRule.any())),
+ "SamzaSqlFilterRemoteJoinRule:filter", smart, relBuilderFactory, systemStreamConfigBySource);
+ }
+
+ @Override public void onMatch(RelOptRuleCall call) {
+ Filter filter = call.rel(0);
+ Join join = call.rel(1);
+ perform(call, filter, join);
+ }
+ }
+}
+
+// End SamzaSqlFilterRemoteJoinRule.java
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
index 760da6d..f5a7e67 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlValidator.java
@@ -20,6 +20,7 @@
package org.apache.samza.sql.planner;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
@@ -69,10 +70,10 @@ public class SamzaSqlValidator {
* @throws SamzaSqlValidatorException exception for sql validation
*/
public void validate(List<String> sqlStmts) throws SamzaSqlValidatorException {
- SamzaSqlApplicationConfig sqlConfig = SamzaSqlDslConverter.getSqlConfig(sqlStmts, config);
- QueryPlanner planner = SamzaSqlDslConverter.getQueryPlanner(sqlConfig);
-
for (String sql: sqlStmts) {
+ SamzaSqlApplicationConfig sqlConfig = SamzaSqlDslConverter.getSqlConfig(Collections.singletonList(sql), config);
+ QueryPlanner planner = SamzaSqlDslConverter.getQueryPlanner(sqlConfig);
+
// we always pass only select query to the planner for samza sql. The reason is that samza sql supports
// schema evolution where source and destination could up to an extent have independent schema evolution while
// calcite expects strict conformance of the destination schema with that of the fields in the select query.
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index 4d52469..f98879c 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -90,6 +90,7 @@ public class SamzaSqlApplicationConfig {
public static final String CFG_METADATA_TOPIC_PREFIX = "samza.sql.metadataTopicPrefix";
public static final String CFG_GROUPBY_WINDOW_DURATION_MS = "samza.sql.groupby.window.ms";
public static final String CFG_SQL_PROCESS_SYSTEM_EVENTS = "samza.sql.processSystemEvents";
+ public static final String CFG_SQL_ENABLE_PLAN_OPTIMIZER = "samza.sql.enablePlanOptimizer";
public static final String SAMZA_SYSTEM_LOG = "log";
@@ -117,6 +118,7 @@ public class SamzaSqlApplicationConfig {
private final String metadataTopicPrefix;
private final long windowDurationMs;
private final boolean processSystemEvents;
+ private final boolean enableQueryPlanOptimizer;
public SamzaSqlApplicationConfig(Config staticConfig, List<String> inputSystemStreams,
List<String> outputSystemStreams) {
@@ -170,6 +172,7 @@ public class SamzaSqlApplicationConfig {
processSystemEvents = staticConfig.getBoolean(CFG_SQL_PROCESS_SYSTEM_EVENTS, true);
windowDurationMs = staticConfig.getLong(CFG_GROUPBY_WINDOW_DURATION_MS, DEFAULT_GROUPBY_WINDOW_DURATION_MS);
+ enableQueryPlanOptimizer = staticConfig.getBoolean(CFG_SQL_ENABLE_PLAN_OPTIMIZER, true);
}
public static <T> T initializePlugin(String pluginName, String plugin, Config staticConfig,
@@ -336,4 +339,8 @@ public class SamzaSqlApplicationConfig {
public boolean isProcessSystemEvents() {
return processSystemEvents;
}
+
+ public boolean isQueryPlanOptimizerEnabled() {
+ return enableQueryPlanOptimizer;
+ }
}
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java
index d952194..1a0a7ec 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinInputNode.java
@@ -20,14 +20,21 @@
package org.apache.samza.sql.translator;
import java.util.List;
+import java.util.Map;
+import org.apache.calcite.plan.hep.HepRelVertex;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.table.descriptors.CachingTableDescriptor;
+import org.apache.samza.table.descriptors.RemoteTableDescriptor;
/**
* This class represents the input node for the join. It can be either a table or a stream.
*/
-class JoinInputNode {
+public class JoinInputNode {
// Calcite RelNode corresponding to the input
private final RelNode relNode;
@@ -37,7 +44,7 @@ class JoinInputNode {
private final InputType inputType;
private final boolean isPosOnRight;
- enum InputType {
+ public enum InputType {
STREAM,
LOCAL_TABLE,
REMOTE_TABLE
@@ -73,4 +80,33 @@ class JoinInputNode {
boolean isPosOnRight() {
return isPosOnRight;
}
+
+ public static JoinInputNode.InputType getInputType(
+ RelNode relNode, Map<String, SqlIOConfig> systemStreamConfigBySource) {
+
+ // NOTE: Any intermediate form of a join is always a stream. Eg: For the second level join of
+ // stream-table-table join, the left side of the join is join output, which we always
+ // assume to be a stream. The intermediate stream won't be an instance of TableScan.
+ // The join key(s) for the table could be an udf in which case the relNode would be LogicalProject.
+
+ // If the relNode is a vertex in a DAG, get the real relNode. This happens due to query optimization.
+ if (relNode instanceof HepRelVertex) {
+ relNode = ((HepRelVertex) relNode).getCurrentRel();
+ }
+
+ if (relNode instanceof TableScan || relNode instanceof LogicalProject || relNode instanceof LogicalFilter) {
+ SqlIOConfig sourceTableConfig = JoinTranslator.resolveSQlIOForTable(relNode, systemStreamConfigBySource);
+ if (sourceTableConfig == null || !sourceTableConfig.getTableDescriptor().isPresent()) {
+ return JoinInputNode.InputType.STREAM;
+ } else if (sourceTableConfig.getTableDescriptor().get() instanceof RemoteTableDescriptor ||
+ sourceTableConfig.getTableDescriptor().get() instanceof CachingTableDescriptor) {
+ return JoinInputNode.InputType.REMOTE_TABLE;
+ } else {
+ return JoinInputNode.InputType.LOCAL_TABLE;
+ }
+ } else {
+ return JoinInputNode.InputType.STREAM;
+ }
+ }
+
}
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index b05c468..02c434d 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -26,7 +26,9 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
+import java.util.Map;
import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.hep.HepRelVertex;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.TableScan;
@@ -54,8 +56,6 @@ import org.apache.samza.sql.interfaces.SqlIOConfig;
import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory;
import org.apache.samza.table.Table;
-import org.apache.samza.table.descriptors.CachingTableDescriptor;
-import org.apache.samza.table.descriptors.RemoteTableDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -94,8 +94,10 @@ class JoinTranslator {
}
void translate(final LogicalJoin join, final TranslatorContext translatorContext) {
- JoinInputNode.InputType inputTypeOnLeft = getInputType(join.getLeft(), translatorContext);
- JoinInputNode.InputType inputTypeOnRight = getInputType(join.getRight(), translatorContext);
+ JoinInputNode.InputType inputTypeOnLeft = JoinInputNode.getInputType(join.getLeft(),
+ translatorContext.getExecutionContext().getSamzaSqlApplicationConfig().getInputSystemStreamConfigBySource());
+ JoinInputNode.InputType inputTypeOnRight = JoinInputNode.getInputType(join.getRight(),
+ translatorContext.getExecutionContext().getSamzaSqlApplicationConfig().getInputSystemStreamConfigBySource());
// Do the validation of join query
validateJoinQuery(join, inputTypeOnLeft, inputTypeOnRight);
@@ -358,14 +360,19 @@ class JoinTranslator {
SqlExplainLevel.EXPPLAN_ATTRIBUTES);
}
- private SqlIOConfig resolveSQlIOForTable(RelNode relNode, TranslatorContext context) {
+ static SqlIOConfig resolveSQlIOForTable(RelNode relNode, Map<String, SqlIOConfig> systemStreamConfigBySource) {
// Let's recursively get to the TableScan node to identify IO for the table.
+
+ if (relNode instanceof HepRelVertex) {
+ return resolveSQlIOForTable(((HepRelVertex) relNode).getCurrentRel(), systemStreamConfigBySource);
+ }
+
if (relNode instanceof LogicalProject) {
- return resolveSQlIOForTable(((LogicalProject) relNode).getInput(), context);
+ return resolveSQlIOForTable(((LogicalProject) relNode).getInput(), systemStreamConfigBySource);
}
if (relNode instanceof LogicalFilter) {
- return resolveSQlIOForTable(((LogicalFilter) relNode).getInput(), context);
+ return resolveSQlIOForTable(((LogicalFilter) relNode).getInput(), systemStreamConfigBySource);
}
// We return null for table IO as the table seems to be involved in another join. The output of stream-table join
@@ -380,39 +387,17 @@ class JoinTranslator {
}
String sourceName = SqlIOConfig.getSourceFromSourceParts(relNode.getTable().getQualifiedName());
- SqlIOConfig sourceConfig =
- context.getExecutionContext().getSamzaSqlApplicationConfig().getInputSystemStreamConfigBySource().get(sourceName);
+ SqlIOConfig sourceConfig = systemStreamConfigBySource.get(sourceName);
if (sourceConfig == null) {
throw new SamzaException("Unsupported source found in join statement: " + sourceName);
}
return sourceConfig;
}
- private JoinInputNode.InputType getInputType(RelNode relNode, TranslatorContext context) {
-
- // NOTE: Any intermediate form of a join is always a stream. Eg: For the second level join of
- // stream-table-table join, the left side of the join is join output, which we always
- // assume to be a stream. The intermediate stream won't be an instance of TableScan.
- // The join key(s) for the table could be an udf in which case the relNode would be LogicalProject.
-
- if (relNode instanceof TableScan || relNode instanceof LogicalProject) {
- SqlIOConfig sourceTableConfig = resolveSQlIOForTable(relNode, context);
- if (sourceTableConfig == null || !sourceTableConfig.getTableDescriptor().isPresent()) {
- return JoinInputNode.InputType.STREAM;
- } else if (sourceTableConfig.getTableDescriptor().get() instanceof RemoteTableDescriptor ||
- sourceTableConfig.getTableDescriptor().get() instanceof CachingTableDescriptor) {
- return JoinInputNode.InputType.REMOTE_TABLE;
- } else {
- return JoinInputNode.InputType.LOCAL_TABLE;
- }
- } else {
- return JoinInputNode.InputType.STREAM;
- }
- }
-
private Table getTable(JoinInputNode tableNode, TranslatorContext context) {
- SqlIOConfig sourceTableConfig = resolveSQlIOForTable(tableNode.getRelNode(), context);
+ SqlIOConfig sourceTableConfig = resolveSQlIOForTable(tableNode.getRelNode(),
+ context.getExecutionContext().getSamzaSqlApplicationConfig().getInputSystemStreamConfigBySource());
if (sourceTableConfig == null || !sourceTableConfig.getTableDescriptor().isPresent()) {
String errMsg = "Failed to resolve table source in join operation: node=" + tableNode.getRelNode();
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index 8247921..405fb2c 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -192,7 +192,7 @@ public class QueryTranslator {
void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamApplicationDescriptor appDesc, int queryId) {
QueryPlanner planner =
new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(),
- sqlConfig.getUdfMetadata());
+ sqlConfig.getUdfMetadata(), sqlConfig.isQueryPlanOptimizerEnabled());
final RelRoot relRoot = planner.plan(queryInfo.getSelectQuery());
SamzaSqlExecutionContext executionContext = new SamzaSqlExecutionContext(sqlConfig);
TranslatorContext translatorContext = new TranslatorContext(appDesc, relRoot, executionContext);
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java b/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
index f015d2a..4a6390b 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/util/SamzaSqlQueryParser.java
@@ -165,6 +165,7 @@ public class SamzaSqlQueryParser {
.traitDefs(traitDefs)
.context(Contexts.EMPTY_CONTEXT)
.costFactory(null)
+ //.programs(Programs.CALC_PROGRAM)
.build();
return Frameworks.getPlanner(frameworkConfig);
}
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/planner/TestQueryPlanner.java b/samza-sql/src/test/java/org/apache/samza/sql/planner/TestQueryPlanner.java
new file mode 100644
index 0000000..2a9dc13
--- /dev/null
+++ b/samza-sql/src/test/java/org/apache/samza/sql/planner/TestQueryPlanner.java
@@ -0,0 +1,345 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql.planner;
+
+import java.util.Collection;
+import java.util.Map;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalJoin;
+import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.calcite.rel.logical.LogicalTableScan;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.sql.dsl.SamzaSqlDslConverterFactory;
+import org.apache.samza.sql.interfaces.DslConverter;
+import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
+import org.apache.samza.sql.util.SamzaSqlTestConfig;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class TestQueryPlanner {
+
+ @Test
+ public void testTranslate() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
+ String sql =
+ "Insert into testavro.outputTopic(id) select MyTest(id) from testavro.level1.level2.SIMPLE1 as s where s.id = 10";
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+
+ DslConverter dslConverter = new SamzaSqlDslConverterFactory().create(samzaConfig);
+ Collection<RelRoot> relRoots = dslConverter.convertDsl(sql);
+ assertEquals(1, relRoots.size());
+ }
+
+ @Test
+ public void testRemoteJoinWithFilter() throws SamzaSqlValidatorException {
+ testRemoteJoinWithFilterHelper(false);
+ }
+
+ @Test
+ public void testRemoteJoinWithUdfAndFilter() throws SamzaSqlValidatorException {
+ testRemoteJoinWithUdfAndFilterHelper(false);
+ }
+
+ @Test
+ public void testRemoteJoinWithFilterAndOptimizer() throws SamzaSqlValidatorException {
+ testRemoteJoinWithFilterHelper(true);
+ }
+
+ @Test
+ public void testRemoteJoinWithUdfAndFilterAndOptimizer() throws SamzaSqlValidatorException {
+ testRemoteJoinWithUdfAndFilterHelper(true);
+ }
+
+ void testRemoteJoinWithFilterHelper(boolean enableOptimizer) throws SamzaSqlValidatorException {
+ Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic "
+ + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
+ + " p.name as profileName, p.address as profileAddress "
+ + "from testavro.PAGEVIEW as pv "
+ + "join testRemoteStore.Profile.`$table` as p "
+ + " on p.__key__ = pv.profileId"
+ + " where p.name = pv.pageKey AND p.name = 'Mike' AND pv.profileId = 1";
+
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(enableOptimizer));
+
+ Config samzaConfig = new MapConfig(staticConfigs);
+ DslConverter dslConverter = new SamzaSqlDslConverterFactory().create(samzaConfig);
+ Collection<RelRoot> relRoots = dslConverter.convertDsl(sql);
+
+ /*
+ Query plan without optimization:
+ LogicalProject(__key__=[$1], pageKey=[$1], companyName=['N/A'], profileName=[$5], profileAddress=[$7])
+ LogicalFilter(condition=[AND(=($5, $1), =($5, 'Mike'), =($2, 1))])
+ LogicalJoin(condition=[=($3, $2)], joinType=[inner])
+ LogicalTableScan(table=[[testavro, PAGEVIEW]])
+ LogicalTableScan(table=[[testRemoteStore, Profile, $table]])
+
+ Query plan with optimization:
+ LogicalProject(__key__=[$1], pageKey=[$1], companyName=['N/A'], profileName=[$5], profileAddress=[$7])
+ LogicalFilter(condition=[AND(=($5, $1), =($5, 'Mike'))])
+ LogicalJoin(condition=[=($3, $2)], joinType=[inner])
+ LogicalFilter(condition=[=($2, 1)])
+ LogicalTableScan(table=[[testavro, PAGEVIEW]])
+ LogicalTableScan(table=[[testRemoteStore, Profile, $table]])
+ */
+
+ assertEquals(1, relRoots.size());
+ RelRoot relRoot = relRoots.iterator().next();
+ RelNode relNode = relRoot.rel;
+ assertTrue(relNode instanceof LogicalProject);
+ relNode = relNode.getInput(0);
+ assertTrue(relNode instanceof LogicalFilter);
+ if (enableOptimizer) {
+ assertEquals("AND(=($1, $5), =($5, 'Mike'))", ((LogicalFilter) relNode).getCondition().toString());
+ } else {
+ assertEquals("AND(=(1, $2), =($1, $5), =($5, 'Mike'))", ((LogicalFilter) relNode).getCondition().toString());
+ }
+ relNode = relNode.getInput(0);
+ assertTrue(relNode instanceof LogicalJoin);
+ assertEquals(2, relNode.getInputs().size());
+ LogicalJoin join = (LogicalJoin) relNode;
+ RelNode left = join.getLeft();
+ RelNode right = join.getRight();
+ assertTrue(right instanceof LogicalTableScan);
+ if (enableOptimizer) {
+ assertTrue(left instanceof LogicalFilter);
+ assertEquals("=(1, $2)", ((LogicalFilter) left).getCondition().toString());
+ assertTrue(left.getInput(0) instanceof LogicalTableScan);
+ } else {
+ assertTrue(left instanceof LogicalTableScan);
+ }
+ }
+
+ void testRemoteJoinWithUdfAndFilterHelper(boolean enableOptimizer) throws SamzaSqlValidatorException {
+ Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic "
+ + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
+ + " p.name as profileName, p.address as profileAddress "
+ + "from testRemoteStore.Profile.`$table` as p "
+ + "join testavro.PAGEVIEW as pv "
+ + " on p.__key__ = BuildOutputRecord('id', pv.profileId)"
+ + " where p.name = 'Mike' and pv.profileId = 1 and p.name = pv.pageKey";
+
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(enableOptimizer));
+
+ Config samzaConfig = new MapConfig(staticConfigs);
+ DslConverter dslConverter = new SamzaSqlDslConverterFactory().create(samzaConfig);
+ Collection<RelRoot> relRoots = dslConverter.convertDsl(sql);
+
+ /*
+ Query plan without optimization:
+ LogicalProject(__key__=[$9], pageKey=[$9], companyName=['N/A'], profileName=[$2], profileAddress=[$4])
+ LogicalFilter(condition=[AND(=($2, 'Mike'), =($10, 1), =($2, $9))]) ==> Only the second condition could be pushed down.
+ LogicalProject(__key__=[$0], id=[$1], name=[$2], companyId=[$3], address=[$4], selfEmployed=[$5],
+ phoneNumbers=[$6], mapValues=[$7], __key__0=[$8], pageKey=[$9], profileId=[$10])
+ ==> ProjectMergeRule removes this redundant node.
+ LogicalJoin(condition=[=($0, $11)], joinType=[inner])
+ LogicalTableScan(table=[[testRemoteStore, Profile, $table]])
+ LogicalProject(__key__=[$0], pageKey=[$1], profileId=[$2], $f3=[BuildOutputRecord('id', $2)]) ==> Filter is pushed above project.
+ LogicalTableScan(table=[[testavro, PAGEVIEW]])
+
+ Query plan with optimization:
+ LogicalProject(__key__=[$9], pageKey=[$9], companyName=['N/A'], profileName=[$2], profileAddress=[$4])
+ LogicalFilter(condition=[AND(=($2, 'Mike'), =($2, $9))])
+ LogicalJoin(condition=[=($0, $11)], joinType=[inner])
+ LogicalTableScan(table=[[testRemoteStore, Profile, $table]])
+ LogicalFilter(condition=[=($2, 1)])
+ LogicalProject(__key__=[$0], pageKey=[$1], profileId=[$2], $f3=[BuildOutputRecord('id', $2)])
+ LogicalTableScan(table=[[testavro, PAGEVIEW]])
+ */
+
+ assertEquals(1, relRoots.size());
+ RelRoot relRoot = relRoots.iterator().next();
+ RelNode relNode = relRoot.rel;
+ assertTrue(relNode instanceof LogicalProject);
+ relNode = relNode.getInput(0);
+ assertTrue(relNode instanceof LogicalFilter);
+ if (enableOptimizer) {
+ assertEquals("AND(=($2, $9), =($2, 'Mike'))", ((LogicalFilter) relNode).getCondition().toString());
+ } else {
+ assertEquals("AND(=($2, $9), =(1, $10), =($2, 'Mike'))", ((LogicalFilter) relNode).getCondition().toString());
+ }
+ relNode = relNode.getInput(0);
+ if (enableOptimizer) {
+ assertTrue(relNode instanceof LogicalJoin);
+ assertEquals(2, relNode.getInputs().size());
+ } else {
+ assertTrue(relNode instanceof LogicalProject);
+ relNode = relNode.getInput(0);
+ }
+ LogicalJoin join = (LogicalJoin) relNode;
+ RelNode left = join.getLeft();
+ RelNode right = join.getRight();
+ assertTrue(left instanceof LogicalTableScan);
+ if (enableOptimizer) {
+ assertTrue(right instanceof LogicalFilter);
+ assertEquals("=(1, $2)", ((LogicalFilter) right).getCondition().toString());
+ relNode = right.getInput(0);
+ } else {
+ relNode = right;
+ }
+ assertTrue(relNode instanceof LogicalProject);
+ relNode = relNode.getInput(0);
+ assertTrue(relNode instanceof LogicalTableScan);
+ }
+
+ @Test
+ public void testLocalStreamTableInnerJoinFilterOptimization() throws Exception {
+ Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic "
+ + "select pv.pageKey as __key__, pv.pageKey as pageKey, p.name as companyName, p.name as profileName,"
+ + " p.address as profileAddress "
+ + "from testavro.PROFILE.`$table` as p "
+ + "join testavro.PAGEVIEW as pv "
+ + " on p.id = pv.profileId "
+ + "where p.name = 'Mike'";
+
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(true));
+
+ Config samzaConfig = new MapConfig(staticConfigs);
+ DslConverter dslConverter = new SamzaSqlDslConverterFactory().create(samzaConfig);
+ Collection<RelRoot> relRootsWithOptimization = dslConverter.convertDsl(sql);
+
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(false));
+
+ samzaConfig = new MapConfig(staticConfigs);
+ dslConverter = new SamzaSqlDslConverterFactory().create(samzaConfig);
+ Collection<RelRoot> relRootsWithoutOptimization = dslConverter.convertDsl(sql);
+
+ // We do not yet have any join filter optimizations for local joins. Hence the plans with and without optimization
+ // should be the same.
+ assertEquals(RelOptUtil.toString(relRootsWithOptimization.iterator().next().rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES),
+ RelOptUtil.toString(relRootsWithoutOptimization.iterator().next().rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES));
+ }
+
+ @Test
+ public void testRemoteJoinFilterPushDownWithUdfInFilterAndOptimizer() throws SamzaSqlValidatorException {
+ Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic "
+ + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
+ + " p.name as profileName, p.address as profileAddress "
+ + "from testRemoteStore.Profile.`$table` as p "
+ + "join testavro.PAGEVIEW as pv "
+ + " on p.__key__ = pv.profileId"
+ + " where p.name = pv.pageKey AND p.name = 'Mike' AND pv.profileId = MyTest(pv.profileId)";
+
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(true));
+
+ Config samzaConfig = new MapConfig(staticConfigs);
+ DslConverter dslConverter = new SamzaSqlDslConverterFactory().create(samzaConfig);
+ Collection<RelRoot> relRoots = dslConverter.convertDsl(sql);
+
+ /*
+ Query plan without optimization:
+ LogicalProject(__key__=[$9], pageKey=[$9], companyName=['N/A'], profileName=[$2], profileAddress=[$4])
+ LogicalFilter(condition=[AND(=($2, $9), =($2, 'Mike'), =($10, CAST(MyTest($10)):INTEGER))])
+ LogicalJoin(condition=[=($0, $10)], joinType=[inner])
+ LogicalTableScan(table=[[testRemoteStore, Profile, $table]])
+ LogicalTableScan(table=[[testavro, PAGEVIEW]])
+
+ Query plan with optimization:
+ LogicalProject(__key__=[$9], pageKey=[$9], companyName=['N/A'], profileName=[$2], profileAddress=[$4])
+ LogicalFilter(condition=[AND(=($2, $9), =($2, 'Mike'))])
+ LogicalJoin(condition=[=($0, $10)], joinType=[inner])
+ LogicalTableScan(table=[[testRemoteStore, Profile, $table]])
+ LogicalFilter(condition=[=($2, CAST(MyTest($2)):INTEGER)])
+ LogicalTableScan(table=[[testavro, PAGEVIEW]])
+ */
+
+ assertEquals(1, relRoots.size());
+ RelRoot relRoot = relRoots.iterator().next();
+ RelNode relNode = relRoot.rel;
+ assertTrue(relNode instanceof LogicalProject);
+ relNode = relNode.getInput(0);
+ assertTrue(relNode instanceof LogicalFilter);
+ assertEquals("AND(=($2, $9), =($2, 'Mike'))", ((LogicalFilter) relNode).getCondition().toString());
+ relNode = relNode.getInput(0);
+ assertTrue(relNode instanceof LogicalJoin);
+ assertEquals(2, relNode.getInputs().size());
+ LogicalJoin join = (LogicalJoin) relNode;
+ RelNode left = join.getLeft();
+ RelNode right = join.getRight();
+ assertTrue(left instanceof LogicalTableScan);
+ assertTrue(right instanceof LogicalFilter);
+ assertEquals("=($2, CAST(MyTest($2)):INTEGER)", ((LogicalFilter) right).getCondition().toString());
+ assertTrue(right.getInput(0) instanceof LogicalTableScan);
+ }
+
+ @Test
+ public void testRemoteJoinNoFilterPushDownWithUdfInFilterAndOptimizer() throws SamzaSqlValidatorException {
+ Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(1);
+
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic "
+ + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
+ + " p.name as profileName, p.address as profileAddress "
+ + "from testRemoteStore.Profile.`$table` as p "
+ + "join testavro.PAGEVIEW as pv "
+ + " on p.__key__ = pv.profileId"
+ + " where p.name = pv.pageKey AND p.name = 'Mike' AND pv.profileId = MyTestPoly(p.name)";
+
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(true));
+
+ Config samzaConfig = new MapConfig(staticConfigs);
+ DslConverter dslConverter = new SamzaSqlDslConverterFactory().create(samzaConfig);
+ Collection<RelRoot> relRootsWithOptimization = dslConverter.convertDsl(sql);
+
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(false));
+
+ samzaConfig = new MapConfig(staticConfigs);
+ dslConverter = new SamzaSqlDslConverterFactory().create(samzaConfig);
+ Collection<RelRoot> relRootsWithoutOptimization = dslConverter.convertDsl(sql);
+
+ /*
+ LogicalProject(__key__=[$9], pageKey=[$9], companyName=['N/A'], profileName=[$2], profileAddress=[$4])
+ LogicalFilter(condition=[AND(=($2, $9), =($2, 'Mike'), =($10, CAST(MyTestPoly($10)):INTEGER))])
+ LogicalJoin(condition=[=($0, $10)], joinType=[inner])
+ LogicalTableScan(table=[[testRemoteStore, Profile, $table]])
+ LogicalTableScan(table=[[testavro, PAGEVIEW]])
+ */
+
+ // None of the conditions in the filter could be pushed down as they all require a remote call. Hence the plans
+ // with and without optimization should be the same.
+ assertEquals(RelOptUtil.toString(relRootsWithOptimization.iterator().next().rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES),
+ RelOptUtil.toString(relRootsWithoutOptimization.iterator().next().rel, SqlExplainLevel.EXPPLAN_ATTRIBUTES));
+ }
+}
+
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
index b5ebbbd..c41038d 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java
@@ -105,7 +105,26 @@ public class TestSamzaSqlRemoteTable extends SamzaSqlIntegrationTestHarness {
}
@Test
- public void testSourceEndToEndWithKey() throws SamzaSqlValidatorException {
+ public void testJoinEndToEnd() throws SamzaSqlValidatorException {
+ testJoinEndToEndHelper(false);
+ }
+
+ @Test
+ public void testJoinEndToEndWithUdf() throws SamzaSqlValidatorException {
+ testJoinEndToEndWithUdfHelper(false);
+ }
+
+ @Test
+ public void testJoinEndToEndWithOptimizer() throws SamzaSqlValidatorException {
+ testJoinEndToEndHelper(true);
+ }
+
+ @Test
+ public void testJoinEndToEndWithUdfAndOptimizer() throws SamzaSqlValidatorException {
+ testJoinEndToEndWithUdfHelper(true);
+ }
+
+ void testJoinEndToEndHelper(boolean enableOptimizer) throws SamzaSqlValidatorException {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
@@ -123,6 +142,7 @@ public class TestSamzaSqlRemoteTable extends SamzaSqlIntegrationTestHarness {
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(enableOptimizer));
Config config = new MapConfig(staticConfigs);
new SamzaSqlValidator(config).validate(sqlStmts);
@@ -139,8 +159,7 @@ public class TestSamzaSqlRemoteTable extends SamzaSqlIntegrationTestHarness {
Assert.assertEquals(expectedOutMessages, outMessages);
}
- @Test
- public void testSourceEndToEndWithKeyAndUdf() throws SamzaSqlValidatorException {
+ void testJoinEndToEndWithUdfHelper(boolean enableOptimizer) throws SamzaSqlValidatorException {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
@@ -158,6 +177,7 @@ public class TestSamzaSqlRemoteTable extends SamzaSqlIntegrationTestHarness {
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(enableOptimizer));
Config config = new MapConfig(staticConfigs);
new SamzaSqlValidator(config).validate(sqlStmts);
@@ -175,6 +195,96 @@ public class TestSamzaSqlRemoteTable extends SamzaSqlIntegrationTestHarness {
}
@Test
+ public void testJoinEndToEndWithFilter() throws SamzaSqlValidatorException {
+ testJoinEndToEndWithFilterHelper(false);
+ }
+
+ @Test
+ public void testJoinEndToEndWithUdfAndFilter() throws SamzaSqlValidatorException {
+ testJoinEndToEndWithUdfAndFilterHelper(false);
+ }
+
+ @Test
+ public void testJoinEndToEndWithFilterAndOptimizer() throws SamzaSqlValidatorException {
+ testJoinEndToEndWithFilterHelper(true);
+ }
+
+ @Test
+ public void testJoinEndToEndWithUdfAndFilterAndOptimizer() throws SamzaSqlValidatorException {
+ testJoinEndToEndWithUdfAndFilterHelper(true);
+ }
+
+ void testJoinEndToEndWithFilterHelper(boolean enableOptimizer) throws SamzaSqlValidatorException {
+ int numMessages = 20;
+
+ TestAvroSystemFactory.messages.clear();
+ RemoteStoreIOResolverTestFactory.records.clear();
+ Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+ populateProfileTable(staticConfigs, numMessages);
+
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic "
+ + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
+ + " p.name as profileName, p.address as profileAddress "
+ + "from testRemoteStore.Profile.`$table` as p "
+ + "join testavro.PAGEVIEW as pv "
+ + " on p.__key__ = pv.profileId"
+ + " where p.name = 'Mike' and pv.profileId = 1";
+
+ List<String> sqlStmts = Arrays.asList(sql);
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(enableOptimizer));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
+
+ List<String> outMessages = TestAvroSystemFactory.messages.stream()
+ .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + ","
+ + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" :
+ ((GenericRecord) x.getMessage()).get("profileName").toString()))
+ .collect(Collectors.toList());
+ Assert.assertEquals(1, outMessages.size());
+ Assert.assertEquals(outMessages.get(0), "home,Mike");
+ }
+
+ void testJoinEndToEndWithUdfAndFilterHelper(boolean enableOptimizer) throws SamzaSqlValidatorException {
+ int numMessages = 20;
+
+ TestAvroSystemFactory.messages.clear();
+ RemoteStoreIOResolverTestFactory.records.clear();
+ Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+ populateProfileTable(staticConfigs, numMessages);
+
+ String sql =
+ "Insert into testavro.enrichedPageViewTopic "
+ + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
+ + " p.name as profileName, p.address as profileAddress "
+ + "from testRemoteStore.Profile.`$table` as p "
+ + "join testavro.PAGEVIEW as pv "
+ + " on p.__key__ = BuildOutputRecord('id', pv.profileId)"
+ + " where p.name = 'Mike' and pv.profileId = 1";
+
+ List<String> sqlStmts = Arrays.asList(sql);
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_ENABLE_PLAN_OPTIMIZER, Boolean.toString(enableOptimizer));
+
+ Config config = new MapConfig(staticConfigs);
+ new SamzaSqlValidator(config).validate(sqlStmts);
+
+ runApplication(config);
+
+ List<String> outMessages = TestAvroSystemFactory.messages.stream()
+ .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + ","
+ + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" :
+ ((GenericRecord) x.getMessage()).get("profileName").toString()))
+ .collect(Collectors.toList());
+ Assert.assertEquals(1, outMessages.size());
+ Assert.assertEquals(outMessages.get(0), "home,Mike");
+ }
+
+ @Test
public void testSourceEndToEndWithKeyWithNullForeignKeys() throws SamzaSqlValidatorException {
int numMessages = 20;
@@ -352,7 +462,7 @@ public class TestSamzaSqlRemoteTable extends SamzaSqlIntegrationTestHarness {
+ "select p.__key__ as __key__, 'UPDATE' as __op__ "
+ "from testRemoteStore.Profile.`$table` as p "
+ "join testavro.PAGEVIEW as pv "
- + " on p.__key__ = pv.profileId ";
+ + " on p.__key__ = pv.profileId";
List<String> sqlStmts = Arrays.asList(sql);
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));