You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2018/11/28 23:53:41 UTC
samza git commit: SAMZA-2007: Samza-sql - Fix Samza-SQL to not expect
LogicalTableModify in the Calcite plan.
Repository: samza
Updated Branches:
refs/heads/master 5069f1ddb -> ff607cb6b
SAMZA-2007: Samza-sql - Fix Samza-SQL to not expect LogicalTableModify in the Calcite plan.
This is required for supporting schema evolution without failing the jobs.
Author: Aditya Toomula <at...@linkedin.com>
Reviewers: Srinivasulu Punuru <sp...@linkedin.com>
Closes #821 from atoomula/modify and squashes the following commits:
17b4b1c1 [Aditya Toomula] SAMZA-2007: Samza-sql - Fix Samza-SQL to not expect LogicalTableModify in the Calcite plan.
65be581a [Aditya Toomula] SAMZA-2007: Samza-sql - Fix Samza-SQL to not expect LogicalTableModify in the Calcite plan.
fb50ee81 [Aditya Toomula] SAMZA-2007: Samza-sql - Fix Samza-SQL to not expect LogicalTableModify in the Calcite plan.
9fff9573 [Aditya Toomula] SAMZA-2007: Samza-sql - Fix Samza-SQL to not expect LogicalTableModify in the Calcite plan.
f3e887c6 [Aditya Toomula] dummy
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ff607cb6
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ff607cb6
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ff607cb6
Branch: refs/heads/master
Commit: ff607cb6b61144ea6feb28842318e1eb1e5eaa5a
Parents: 5069f1d
Author: Aditya Toomula <at...@linkedin.com>
Authored: Wed Nov 28 15:53:30 2018 -0800
Committer: Srinivasulu Punuru <sp...@linkedin.com>
Committed: Wed Nov 28 15:53:30 2018 -0800
----------------------------------------------------------------------
.../samza/sql/dsl/SamzaSqlDslConverter.java | 16 +-
.../samza/sql/runner/SamzaSqlApplication.java | 10 +-
.../sql/runner/SamzaSqlApplicationConfig.java | 60 ++---
.../sql/runner/SamzaSqlApplicationRunner.java | 7 +-
.../samza/sql/translator/ModifyTranslator.java | 127 -----------
.../samza/sql/translator/QueryTranslator.java | 52 ++---
.../runner/TestSamzaSqlApplicationConfig.java | 55 ++++-
.../sql/translator/TestQueryTranslator.java | 221 +++++++++++++++----
.../test/samzasql/TestSamzaSqlEndToEnd.java | 4 +-
9 files changed, 295 insertions(+), 257 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/ff607cb6/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
index 6cce906..d4cb134 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
@@ -55,22 +55,20 @@ public class SamzaSqlDslConverter implements DslConverter {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
SamzaSqlApplicationConfig sqlConfig = new SamzaSqlApplicationConfig(config,
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
- .collect(Collectors.toSet()),
- queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
QueryPlanner planner =
- new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getSystemStreamConfigsBySource(),
+ new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(),
sqlConfig.getUdfMetadata());
List<RelRoot> relRoots = new LinkedList<>();
for (String sql: sqlStmts) {
- // when sql is a query, we only pass the select query to the planner
+ // we always pass only select query to the planner for samza sql. The reason is that samza sql supports
+ // schema evolution where source and destination could up to an extent have independent schema evolution while
+ // calcite expects strict comformance of the destination schema with that of the fields in the select query.
SamzaSqlQueryParser.QueryInfo qinfo = SamzaSqlQueryParser.parseQuery(sql);
- if (qinfo.getSink().split("\\.")[0].equals(SamzaSqlApplicationConfig.SAMZA_SYSTEM_LOG)) {
- sql = qinfo.getSelectQuery();
- }
-
- relRoots.add(planner.plan(sql));
+ relRoots.add(planner.plan(qinfo.getSelectQuery()));
}
return relRoots;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/ff607cb6/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
index b8bb190..868f5a2 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
@@ -21,10 +21,9 @@ package org.apache.samza.sql.runner;
import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.calcite.rel.RelRoot;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
@@ -52,8 +51,8 @@ public class SamzaSqlApplication implements StreamApplication {
Map<Integer, TranslatorContext> translatorContextMap = new HashMap<>();
// 1. Get Calcite plan
- Set<String> inputSystemStreams = new HashSet<>();
- Set<String> outputSystemStreams = new HashSet<>();
+ List<String> inputSystemStreams = new LinkedList<>();
+ List<String> outputSystemStreams = new LinkedList<>();
Collection<RelRoot> relRoots =
SamzaSqlApplicationConfig.populateSystemStreamsAndGetRelRoots(dslStmts, appDescriptor.getConfig(),
@@ -66,12 +65,13 @@ public class SamzaSqlApplication implements StreamApplication {
// 3. Translate Calcite plan to Samza stream operators
QueryTranslator queryTranslator = new QueryTranslator(appDescriptor, sqlConfig);
SamzaSqlExecutionContext executionContext = new SamzaSqlExecutionContext(sqlConfig);
+ // QueryId implies the index of the query in multiple query statements scenario. It should always start with 0.
int queryId = 0;
for (RelRoot relRoot : relRoots) {
LOG.info("Translating relRoot {} to samza stream graph with queryId {}", relRoot, queryId);
TranslatorContext translatorContext = new TranslatorContext(appDescriptor, relRoot, executionContext);
translatorContextMap.put(queryId, translatorContext);
- queryTranslator.translate(relRoot, translatorContext, queryId);
+ queryTranslator.translate(relRoot, sqlConfig.getOutputSystemStreams().get(queryId), translatorContext, queryId);
queryId++;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/ff607cb6/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index 6e12c02..e608794 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -106,23 +107,33 @@ public class SamzaSqlApplicationConfig {
private final Map<String, SqlIOConfig> inputSystemStreamConfigBySource;
private final Map<String, SqlIOConfig> outputSystemStreamConfigsBySource;
- private final Map<String, SqlIOConfig> systemStreamConfigsBySource;
+
+ // There could only be one output system stream per samza sql statement. The below list datastructure stores the
+ // output system streams in the order of SQL query statements. Please note that there could be duplicate entries
+ // in it during a fan-in scenario (e.g. two sql statements with two different input streams but same output stream).
+ private final List<String> outputSystemStreams;
private final String metadataTopicPrefix;
private final long windowDurationMs;
- public SamzaSqlApplicationConfig(Config staticConfig, Set<String> inputSystemStreams,
- Set<String> outputSystemStreams) {
+ public SamzaSqlApplicationConfig(Config staticConfig, List<String> inputSystemStreams,
+ List<String> outputSystemStreams) {
ioResolver = createIOResolver(staticConfig);
- inputSystemStreamConfigBySource = inputSystemStreams.stream()
+ this.outputSystemStreams = new LinkedList<>(outputSystemStreams);
+
+ // There could be duplicate streams across different queries. Let's dedupe them.
+ Set<String> inputSystemStreamSet = new HashSet<>(inputSystemStreams);
+ Set<String> outputSystemStreamSet = new HashSet<>(outputSystemStreams);
+
+ inputSystemStreamConfigBySource = inputSystemStreamSet.stream()
.collect(Collectors.toMap(Function.identity(), src -> ioResolver.fetchSourceInfo(src)));
- outputSystemStreamConfigsBySource = outputSystemStreams.stream()
+ outputSystemStreamConfigsBySource = outputSystemStreamSet.stream()
.collect(Collectors.toMap(Function.identity(), x -> ioResolver.fetchSinkInfo(x)));
- systemStreamConfigsBySource = new HashMap<>(inputSystemStreamConfigBySource);
+ Map<String, SqlIOConfig> systemStreamConfigsBySource = new HashMap<>(inputSystemStreamConfigBySource);
systemStreamConfigsBySource.putAll(outputSystemStreamConfigsBySource);
Set<SqlIOConfig> systemStreamConfigs = new HashSet<>(systemStreamConfigsBySource.values());
@@ -140,7 +151,7 @@ public class SamzaSqlApplicationConfig {
relSchemaProvidersBySource.get(x.getSource()), c))));
samzaRelTableKeyConvertersBySource = systemStreamConfigs.stream()
- .filter(config -> config.isRemoteTable())
+ .filter(SqlIOConfig::isRemoteTable)
.collect(Collectors.toMap(SqlIOConfig::getSource,
x -> initializePlugin("SamzaRelTableKeyConverter", x.getSamzaRelTableKeyConverterName(),
staticConfig, CFG_FMT_SAMZA_REL_TABLE_KEY_CONVERTER_DOMAIN,
@@ -152,13 +163,6 @@ public class SamzaSqlApplicationConfig {
metadataTopicPrefix =
staticConfig.get(CFG_METADATA_TOPIC_PREFIX, DEFAULT_METADATA_TOPIC_PREFIX);
windowDurationMs = staticConfig.getLong(CFG_GROUPBY_WINDOW_DURATION_MS, DEFAULT_GROUPBY_WINDOW_DURATION_MS);
-
- // remove the SqlIOConfigs of outputs whose system is "log" out of systemStreamConfigsBySource
- outputSystemStreamConfigsBySource.forEach((k, v) -> {
- if (k.split("\\.")[0].equals(SamzaSqlApplicationConfig.SAMZA_SYSTEM_LOG)) {
- systemStreamConfigsBySource.remove(k);
- }
- });
}
public static <T> T initializePlugin(String pluginName, String plugin, Config staticConfig,
@@ -229,31 +233,35 @@ public class SamzaSqlApplicationConfig {
}
public static Collection<RelRoot> populateSystemStreamsAndGetRelRoots(List<String> dslStmts, Config config,
- Set<String> inputSystemStreams, Set<String> outputSystemStreams) {
+ List<String> inputSystemStreams, List<String> outputSystemStreams) {
// TODO: Get the converter factory based on the file type. Create abstraction around this.
DslConverterFactory dslConverterFactory = new SamzaSqlDslConverterFactory();
DslConverter dslConverter = dslConverterFactory.create(config);
Collection<RelRoot> relRoots = dslConverter.convertDsl(String.join("\n", dslStmts));
- // FIXME: the snippet below dose not work when sql is a query
+ // RelRoot does not have sink node for Samza SQL dsl, so we can not traverse the relRoot tree to get
+ // "outputSystemStreams"
+ // FIXME: the snippet below does not work for Samza SQL dsl but is required for other dsls. Future fix could be
+ // for samza sql to build TableModify for sink and stick it to the relRoot, so we could get output stream out of it.
+
// for (RelRoot relRoot : relRoots) {
// SamzaSqlApplicationConfig.populateSystemStreams(relRoot.project(), inputSystemStreams, outputSystemStreams);
// }
- // RelRoot does not have sink node (aka. log.outputStream) when Sql statement is a query, so we
- // can not traverse the tree of relRoot to get "outputSystemStreams"
+ // The below code is specific to Samza SQL dsl and should be removed once Samza SQL includes sink as part of
+ // relRoot and the above code in uncommented.
List<String> sqlStmts = SamzaSqlDslConverter.fetchSqlFromConfig(config);
List<SamzaSqlQueryParser.QueryInfo> queryInfo = SamzaSqlDslConverter.fetchQueryInfo(sqlStmts);
inputSystemStreams.addAll(queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
- .collect(Collectors.toSet()));
- outputSystemStreams.addAll(queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+ .collect(Collectors.toList()));
+ outputSystemStreams.addAll(queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
return relRoots;
}
- private static void populateSystemStreams(RelNode relNode, Set<String> inputSystemStreams,
- Set<String> outputSystemStreams) {
+ private static void populateSystemStreams(RelNode relNode, List<String> inputSystemStreams,
+ List<String> outputSystemStreams) {
if (relNode instanceof TableModify) {
outputSystemStreams.add(getSystemStreamName(relNode));
} else {
@@ -282,6 +290,10 @@ public class SamzaSqlApplicationConfig {
return udfMetadata;
}
+ public List<String> getOutputSystemStreams() {
+ return outputSystemStreams;
+ }
+
public Map<String, SqlIOConfig> getInputSystemStreamConfigBySource() {
return inputSystemStreamConfigBySource;
}
@@ -290,10 +302,6 @@ public class SamzaSqlApplicationConfig {
return outputSystemStreamConfigsBySource;
}
- public Map<String, SqlIOConfig> getSystemStreamConfigsBySource() {
- return systemStreamConfigsBySource;
- }
-
public Map<String, SamzaRelConverter> getSamzaRelConverters() {
return samzaRelConvertersBySource;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/ff607cb6/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
index 41e37f1..d9a44ec 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
@@ -21,10 +21,9 @@ package org.apache.samza.sql.runner;
import java.time.Duration;
import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.commons.lang3.Validate;
import org.apache.samza.application.SamzaApplication;
import org.apache.samza.config.Config;
@@ -83,8 +82,8 @@ public class SamzaSqlApplicationRunner implements ApplicationRunner {
String sqlJson = SamzaSqlApplicationConfig.serializeSqlStmts(dslStmts);
newConfig.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, sqlJson);
- Set<String> inputSystemStreams = new HashSet<>();
- Set<String> outputSystemStreams = new HashSet<>();
+ List<String> inputSystemStreams = new LinkedList<>();
+ List<String> outputSystemStreams = new LinkedList<>();
SamzaSqlApplicationConfig.populateSystemStreamsAndGetRelRoots(dslStmts, config, inputSystemStreams,
outputSystemStreams);
http://git-wip-us.apache.org/repos/asf/samza/blob/ff607cb6/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
deleted file mode 100644
index dbeabab..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.translator;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import org.apache.calcite.rel.core.TableModify;
-import org.apache.commons.lang.Validate;
-import org.apache.samza.SamzaException;
-import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
-import org.apache.samza.context.Context;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.system.descriptors.GenericOutputDescriptor;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.table.descriptors.TableDescriptor;
-import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
-import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.sql.data.SamzaSqlRelMessage;
-import org.apache.samza.sql.interfaces.SamzaRelConverter;
-import org.apache.samza.sql.interfaces.SqlIOConfig;
-import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
-import org.apache.samza.table.Table;
-
-
-/**
- * Translator to translate the TableModify in relational graph to the corresponding output streams in the StreamGraph
- * implementation
- */
-class ModifyTranslator {
-
- private final Map<String, SamzaRelConverter> relMsgConverters;
- private final Map<String, SqlIOConfig> systemStreamConfig;
- private final int queryId;
-
- ModifyTranslator(Map<String, SamzaRelConverter> converters, Map<String, SqlIOConfig> ssc, int queryId) {
- relMsgConverters = converters;
- this.systemStreamConfig = ssc;
- this.queryId = queryId;
- }
-
- // OutputMapFunction converts SamzaSqlRelMessage to SamzaMessage in KV format
- private static class OutputMapFunction implements MapFunction<SamzaSqlRelMessage, KV<Object, Object>> {
- // All the user-supplied functions are expected to be serializable in order to enable full serialization of user
- // DAG. We do not want to serialize samzaMsgConverter as it can be fully constructed during stream operator
- // initialization.
- private transient SamzaRelConverter samzaMsgConverter;
- private final String outputTopic;
- private final int queryId;
-
- OutputMapFunction(String outputTopic, int queryId) {
- this.outputTopic = outputTopic;
- this.queryId = queryId;
- }
-
- @Override
- public void init(Context context) {
- TranslatorContext translatorContext =
- ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
- this.samzaMsgConverter = translatorContext.getMsgConverter(outputTopic);
- }
-
- @Override
- public KV<Object, Object> apply(SamzaSqlRelMessage message) {
- return this.samzaMsgConverter.convertToSamzaMessage(message);
- }
- }
-
- void translate(final TableModify tableModify, final TranslatorContext context, Map<String, DelegatingSystemDescriptor> systemDescriptors,
- Map<String, OutputStream> outputMsgStreams) {
-
- StreamApplicationDescriptor streamAppDesc = context.getStreamAppDescriptor();
- List<String> tableNameParts = tableModify.getTable().getQualifiedName();
- String targetName = SqlIOConfig.getSourceFromSourceParts(tableNameParts);
-
- Validate.isTrue(relMsgConverters.containsKey(targetName), String.format("Unknown source %s", targetName));
-
- SqlIOConfig sinkConfig = systemStreamConfig.get(targetName);
-
- final String systemName = sinkConfig.getSystemName();
- final String streamId = sinkConfig.getStreamId();
- final String source = sinkConfig.getSource();
-
- KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
- DelegatingSystemDescriptor
- sd = systemDescriptors.computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
- GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(streamId, noOpKVSerde);
-
- MessageStreamImpl<SamzaSqlRelMessage> stream =
- (MessageStreamImpl<SamzaSqlRelMessage>) context.getMessageStream(tableModify.getInput().getId());
- MessageStream<KV<Object, Object>> outputStream = stream.map(new OutputMapFunction(targetName, queryId));
-
- Optional<TableDescriptor> tableDescriptor = sinkConfig.getTableDescriptor();
- if (!tableDescriptor.isPresent()) {
- OutputStream stm = outputMsgStreams.computeIfAbsent(source, v -> streamAppDesc.getOutputStream(osd));
- outputStream.sendTo(stm);
- } else {
- Table outputTable = streamAppDesc.getTable(tableDescriptor.get());
- if (outputTable == null) {
- String msg = "Failed to obtain table descriptor of " + sinkConfig.getSource();
- throw new SamzaException(msg);
- }
- outputStream.sendTo(outputTable);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/ff607cb6/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index bb34a41..fef5471 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -32,6 +32,7 @@ import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalFilter;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.commons.lang.Validate;
import org.apache.samza.SamzaException;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.context.Context;
@@ -71,7 +72,6 @@ public class QueryTranslator {
private transient SamzaRelConverter samzaMsgConverter;
private final String outputTopic;
private final int queryId;
- static OutputStream logOutputStream;
OutputMapFunction(String outputTopic, int queryId) {
this.outputTopic = outputTopic;
@@ -103,14 +103,14 @@ public class QueryTranslator {
* For unit testing only
*/
@VisibleForTesting
- public void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamApplicationDescriptor appDesc, int queryId) {
+ void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamApplicationDescriptor appDesc, int queryId) {
QueryPlanner planner =
- new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getSystemStreamConfigsBySource(),
+ new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(),
sqlConfig.getUdfMetadata());
- final RelRoot relRoot = planner.plan(queryInfo.getSql());
+ final RelRoot relRoot = planner.plan(queryInfo.getSelectQuery());
SamzaSqlExecutionContext executionContext = new SamzaSqlExecutionContext(sqlConfig);
TranslatorContext translatorContext = new TranslatorContext(appDesc, relRoot, executionContext);
- translate(relRoot, translatorContext, queryId);
+ translate(relRoot, sqlConfig.getOutputSystemStreams().get(queryId), translatorContext, queryId);
Map<Integer, TranslatorContext> translatorContexts = new HashMap<>();
translatorContexts.put(queryId, translatorContext.clone());
appDesc.withApplicationTaskContextFactory((jobContext,
@@ -120,12 +120,19 @@ public class QueryTranslator {
new SamzaSqlApplicationContext(translatorContexts));
}
- public void translate(RelRoot relRoot, TranslatorContext translatorContext, int queryId) {
+ /**
+ * Translate Calcite plan to Samza stream operators.
+ * @param relRoot Calcite plan in the form of {@link RelRoot}. RelRoot should not include the sink ({@link TableModify})
+ * @param outputSystemStream Sink associated with the Calcite plan.
+ * @param translatorContext Context maintained across translations.
+ * @param queryId query index of the sql statement corresponding to the Calcite plan in multi SQL statement scenario
+ * starting with index 0.
+ */
+ public void translate(RelRoot relRoot, String outputSystemStream, TranslatorContext translatorContext, int queryId) {
final RelNode node = relRoot.project();
+
ScanTranslator scanTranslator =
new ScanTranslator(sqlConfig.getSamzaRelConverters(), sqlConfig.getInputSystemStreamConfigBySource(), queryId);
- ModifyTranslator modifyTranslator =
- new ModifyTranslator(sqlConfig.getSamzaRelConverters(), sqlConfig.getOutputSystemStreamConfigsBySource(), queryId);
node.accept(new RelShuttleImpl() {
int windowId = 0;
@@ -134,21 +141,11 @@ public class QueryTranslator {
@Override
public RelNode visit(RelNode relNode) {
- if (relNode instanceof TableModify) {
- return visit((TableModify) relNode);
- }
+ // There should never be a TableModify in the calcite plan.
+ Validate.isTrue(!(relNode instanceof TableModify));
return super.visit(relNode);
}
- private RelNode visit(TableModify modify) {
- if (!modify.isInsert()) {
- throw new SamzaException("Not a supported operation: " + modify.toString());
- }
- RelNode node = super.visit(modify);
- modifyTranslator.translate(modify, translatorContext, systemDescriptors, outputMsgStreams);
- return node;
- }
-
@Override
public RelNode visit(TableScan scan) {
RelNode node = super.visit(scan);
@@ -190,14 +187,7 @@ public class QueryTranslator {
}
});
- // the snippet below will be performed only when sql is a query statement
- sqlConfig.getOutputSystemStreamConfigsBySource().keySet().forEach(
- key -> {
- if (key.split("\\.")[0].equals(SamzaSqlApplicationConfig.SAMZA_SYSTEM_LOG)) {
- sendToOutputStream(key, streamAppDescriptor, translatorContext, node, queryId);
- }
- }
- );
+ sendToOutputStream(outputSystemStream, streamAppDescriptor, translatorContext, node, queryId);
}
private void sendToOutputStream(String sinkStream, StreamApplicationDescriptor appDesc, TranslatorContext context, RelNode node, int queryId) {
@@ -211,10 +201,8 @@ public class QueryTranslator {
DelegatingSystemDescriptor
sd = systemDescriptors.computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(sinkConfig.getStreamId(), noOpKVSerde);
- if (OutputMapFunction.logOutputStream == null) {
- OutputMapFunction.logOutputStream = appDesc.getOutputStream(osd);
- }
- outputStream.sendTo(OutputMapFunction.logOutputStream);
+ OutputStream stm = outputMsgStreams.computeIfAbsent(sinkConfig.getSource(), v -> appDesc.getOutputStream(osd));
+ outputStream.sendTo(stm);
} else {
Table outputTable = appDesc.getTable(tableDescriptor.get());
if (outputTable == null) {
http://git-wip-us.apache.org/repos/asf/samza/blob/ff607cb6/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
index 46c0651..294eccd 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
@@ -37,7 +37,6 @@ import org.junit.Assert;
import org.junit.Test;
import static org.apache.samza.sql.dsl.SamzaSqlDslConverter.*;
-import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.*;
public class TestSamzaSqlApplicationConfig {
@@ -53,8 +52,8 @@ public class TestSamzaSqlApplicationConfig {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
- .collect(Collectors.toSet()),
- queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
Assert.assertEquals(numUdfs, samzaSqlApplicationConfig.getUdfMetadata().size());
Assert.assertEquals(1, samzaSqlApplicationConfig.getInputSystemStreamConfigBySource().size());
@@ -80,8 +79,8 @@ public class TestSamzaSqlApplicationConfig {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
new SamzaSqlApplicationConfig(new MapConfig(config),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
- .collect(Collectors.toSet()),
- queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
testWithoutConfigShouldFail(config, SamzaSqlApplicationConfig.CFG_IO_RESOLVER);
testWithoutConfigShouldFail(config, SamzaSqlApplicationConfig.CFG_UDF_RESOLVER);
@@ -98,7 +97,7 @@ public class TestSamzaSqlApplicationConfig {
}
@Test
- public void testGetInputAndOutputStreamConfigs() {
+ public void testGetInputAndOutputStreamConfigsFanOut() {
List<String> sqlStmts = Arrays.asList("Insert into testavro.COMPLEX1 select * from testavro.SIMPLE1",
"insert into testavro.Profile select * from testavro.SIMPLE1");
Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
@@ -107,16 +106,48 @@ public class TestSamzaSqlApplicationConfig {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
- .collect(Collectors.toSet()),
- queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
Set<String> inputKeys = samzaSqlApplicationConfig.getInputSystemStreamConfigBySource().keySet();
Set<String> outputKeys = samzaSqlApplicationConfig.getOutputSystemStreamConfigsBySource().keySet();
+ List<String> outputStreamList = samzaSqlApplicationConfig.getOutputSystemStreams();
+
Assert.assertEquals(1, inputKeys.size());
Assert.assertTrue(inputKeys.contains("testavro.SIMPLE1"));
Assert.assertEquals(2, outputKeys.size());
Assert.assertTrue(outputKeys.contains("testavro.COMPLEX1"));
Assert.assertTrue(outputKeys.contains("testavro.Profile"));
+ Assert.assertEquals(2, outputStreamList.size());
+ Assert.assertEquals("testavro.COMPLEX1", outputStreamList.get(0));
+ Assert.assertEquals("testavro.Profile", outputStreamList.get(1));
+ }
+
+ @Test
+ public void testGetInputAndOutputStreamConfigsFanIn() {
+ List<String> sqlStmts = Arrays.asList("Insert into testavro.COMPLEX1 select * from testavro.SIMPLE1",
+ "insert into testavro.COMPLEX1 select * from testavro.SIMPLE2");
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+
+ List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
+
+ Set<String> inputKeys = samzaSqlApplicationConfig.getInputSystemStreamConfigBySource().keySet();
+ Set<String> outputKeys = samzaSqlApplicationConfig.getOutputSystemStreamConfigsBySource().keySet();
+ List<String> outputStreamList = samzaSqlApplicationConfig.getOutputSystemStreams();
+
+ Assert.assertEquals(2, inputKeys.size());
+ Assert.assertTrue(inputKeys.contains("testavro.SIMPLE1"));
+ Assert.assertTrue(inputKeys.contains("testavro.SIMPLE2"));
+ Assert.assertEquals(1, outputKeys.size());
+ Assert.assertTrue(outputKeys.contains("testavro.COMPLEX1"));
+ Assert.assertEquals(2, outputStreamList.size());
+ Assert.assertEquals("testavro.COMPLEX1", outputStreamList.get(0));
+ Assert.assertEquals("testavro.COMPLEX1", outputStreamList.get(1));
}
private void testWithoutConfigShouldPass(Map<String, String> config, String configKey) {
@@ -126,8 +157,8 @@ public class TestSamzaSqlApplicationConfig {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
new SamzaSqlApplicationConfig(new MapConfig(badConfigs),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
- .collect(Collectors.toSet()),
- queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
}
private void testWithoutConfigShouldFail(Map<String, String> config, String configKey) {
@@ -138,8 +169,8 @@ public class TestSamzaSqlApplicationConfig {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
new SamzaSqlApplicationConfig(new MapConfig(badConfigs),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
- .collect(Collectors.toSet()),
- queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
Assert.fail();
} catch (IllegalArgumentException e) {
// swallow
http://git-wip-us.apache.org/repos/asf/samza/blob/ff607cb6/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
index 0c9091d..d9039ec 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
@@ -19,6 +19,7 @@
package org.apache.samza.sql.translator;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -34,6 +35,7 @@ import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory;
import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
+import org.apache.samza.sql.testutil.JsonUtil;
import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
import org.junit.Assert;
@@ -65,8 +67,8 @@ public class TestQueryTranslator {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
- .collect(Collectors.toSet()),
- queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(streamApp -> { },samzaConfig);
QueryTranslator translator = new QueryTranslator(appDesc, samzaSqlApplicationConfig);
@@ -92,6 +94,145 @@ public class TestQueryTranslator {
}
@Test
+ public void testTranslateFanIn() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
+ String sql1 = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE2";
+ String sql2 = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1";
+ List<String> sqlStmts = Arrays.asList(sql1, sql2);
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+
+ List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
+
+ StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(streamApp -> { },samzaConfig);
+ QueryTranslator translator = new QueryTranslator(appDesc, samzaSqlApplicationConfig);
+
+ translator.translate(queryInfo.get(0), appDesc, 0);
+ translator.translate(queryInfo.get(1), appDesc, 1);
+ OperatorSpecGraph specGraph = appDesc.getOperatorSpecGraph();
+
+ StreamConfig streamConfig = new StreamConfig(samzaConfig);
+ String inputStreamId1 = specGraph.getInputOperators().keySet().stream().findFirst().get();
+ String inputSystem1 = streamConfig.getSystem(inputStreamId1);
+ String inputPhysicalName1 = streamConfig.getPhysicalName(inputStreamId1);
+ String inputStreamId2 = specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get();
+ String inputSystem2 = streamConfig.getSystem(inputStreamId2);
+ String inputPhysicalName2 = streamConfig.getPhysicalName(inputStreamId2);
+
+ String outputStreamId = specGraph.getOutputStreams().keySet().stream().findFirst().get();
+ String outputSystem = streamConfig.getSystem(outputStreamId);
+ String outputPhysicalName = streamConfig.getPhysicalName(outputStreamId);
+
+ Assert.assertEquals(1, specGraph.getOutputStreams().size());
+ Assert.assertEquals("testavro", outputSystem);
+ Assert.assertEquals("simpleOutputTopic", outputPhysicalName);
+
+ Assert.assertEquals(2, specGraph.getInputOperators().size());
+ Assert.assertEquals("testavro", inputSystem1);
+ Assert.assertEquals("SIMPLE2", inputPhysicalName1);
+ Assert.assertEquals("testavro", inputSystem2);
+ Assert.assertEquals("SIMPLE1", inputPhysicalName2);
+ }
+
+ @Test
+ public void testTranslateFanOut() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
+ String sql1 = "Insert into testavro.SIMPLE2 select * from testavro.SIMPLE1";
+ String sql2 = "Insert into testavro.SIMPLE3 select * from testavro.SIMPLE1";
+ List<String> sqlStmts = Arrays.asList(sql1, sql2);
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+
+ List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
+
+ StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(streamApp -> { },samzaConfig);
+ QueryTranslator translator = new QueryTranslator(appDesc, samzaSqlApplicationConfig);
+
+ translator.translate(queryInfo.get(0), appDesc, 0);
+ translator.translate(queryInfo.get(1), appDesc, 1);
+ OperatorSpecGraph specGraph = appDesc.getOperatorSpecGraph();
+
+ StreamConfig streamConfig = new StreamConfig(samzaConfig);
+ String inputStreamId = specGraph.getInputOperators().keySet().stream().findFirst().get();
+ String inputSystem = streamConfig.getSystem(inputStreamId);
+ String inputPhysicalName = streamConfig.getPhysicalName(inputStreamId);
+ String outputStreamId1 = specGraph.getOutputStreams().keySet().stream().findFirst().get();
+ String outputSystem1 = streamConfig.getSystem(outputStreamId1);
+ String outputPhysicalName1 = streamConfig.getPhysicalName(outputStreamId1);
+ String outputStreamId2 = specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get();
+ String outputSystem2 = streamConfig.getSystem(outputStreamId2);
+ String outputPhysicalName2 = streamConfig.getPhysicalName(outputStreamId2);
+
+ Assert.assertEquals(2, specGraph.getOutputStreams().size());
+ Assert.assertEquals("testavro", outputSystem1);
+ Assert.assertEquals("SIMPLE2", outputPhysicalName1);
+ Assert.assertEquals("testavro", outputSystem2);
+ Assert.assertEquals("SIMPLE3", outputPhysicalName2);
+
+ Assert.assertEquals(1, specGraph.getInputOperators().size());
+ Assert.assertEquals("testavro", inputSystem);
+ Assert.assertEquals("SIMPLE1", inputPhysicalName);
+ }
+
+ @Test
+ public void testTranslateMultiSql() {
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
+ String sql1 = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1";
+ String sql2 = "Insert into testavro.SIMPLE3 select * from testavro.SIMPLE2";
+ List<String> sqlStmts = Arrays.asList(sql1, sql2);
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+ Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+
+ List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
+
+ StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(streamApp -> { },samzaConfig);
+ QueryTranslator translator = new QueryTranslator(appDesc, samzaSqlApplicationConfig);
+
+ translator.translate(queryInfo.get(0), appDesc, 0);
+ translator.translate(queryInfo.get(1), appDesc, 1);
+ OperatorSpecGraph specGraph = appDesc.getOperatorSpecGraph();
+
+ StreamConfig streamConfig = new StreamConfig(samzaConfig);
+ String inputStreamId1 = specGraph.getInputOperators().keySet().stream().findFirst().get();
+ String inputSystem1 = streamConfig.getSystem(inputStreamId1);
+ String inputPhysicalName1 = streamConfig.getPhysicalName(inputStreamId1);
+ String inputStreamId2 = specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get();
+ String inputSystem2 = streamConfig.getSystem(inputStreamId2);
+ String inputPhysicalName2 = streamConfig.getPhysicalName(inputStreamId2);
+
+ String outputStreamId1 = specGraph.getOutputStreams().keySet().stream().findFirst().get();
+ String outputSystem1 = streamConfig.getSystem(outputStreamId1);
+ String outputPhysicalName1 = streamConfig.getPhysicalName(outputStreamId1);
+ String outputStreamId2 = specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get();
+ String outputSystem2 = streamConfig.getSystem(outputStreamId2);
+ String outputPhysicalName2 = streamConfig.getPhysicalName(outputStreamId2);
+
+ Assert.assertEquals(2, specGraph.getOutputStreams().size());
+ Assert.assertEquals("testavro", outputSystem1);
+ Assert.assertEquals("simpleOutputTopic", outputPhysicalName1);
+ Assert.assertEquals("testavro", outputSystem2);
+ Assert.assertEquals("SIMPLE3", outputPhysicalName2);
+
+ Assert.assertEquals(2, specGraph.getInputOperators().size());
+ Assert.assertEquals("testavro", inputSystem1);
+ Assert.assertEquals("SIMPLE1", inputPhysicalName1);
+ Assert.assertEquals("testavro", inputSystem2);
+ Assert.assertEquals("SIMPLE2", inputPhysicalName2);
+ }
+
+ @Test
public void testTranslateComplex() {
Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
@@ -102,8 +243,8 @@ public class TestQueryTranslator {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
- .collect(Collectors.toSet()),
- queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -139,8 +280,8 @@ public class TestQueryTranslator {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
- .collect(Collectors.toSet()),
- queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -179,8 +320,8 @@ public class TestQueryTranslator {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
- .collect(Collectors.toSet()),
- queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -204,8 +345,8 @@ public class TestQueryTranslator {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
- .collect(Collectors.toSet()),
- queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -229,8 +370,8 @@ public class TestQueryTranslator {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
- .collect(Collectors.toSet()),
- queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -254,8 +395,8 @@ public class TestQueryTranslator {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
- .collect(Collectors.toSet()),
- queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -276,8 +417,8 @@ public class TestQueryTranslator {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
- .collect(Collectors.toSet()),
- queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -300,8 +441,8 @@ public class TestQueryTranslator {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
- .collect(Collectors.toSet()),
- queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -325,8 +466,8 @@ public class TestQueryTranslator {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
- .collect(Collectors.toSet()),
- queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -349,8 +490,8 @@ public class TestQueryTranslator {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
- .collect(Collectors.toSet()),
- queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -373,8 +514,8 @@ public class TestQueryTranslator {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
- .collect(Collectors.toSet()),
- queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -397,8 +538,8 @@ public class TestQueryTranslator {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
- .collect(Collectors.toSet()),
- queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -421,8 +562,8 @@ public class TestQueryTranslator {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
- .collect(Collectors.toSet()),
- queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -449,8 +590,8 @@ public class TestQueryTranslator {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
- .collect(Collectors.toSet()),
- queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -474,8 +615,8 @@ public class TestQueryTranslator {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
- .collect(Collectors.toSet()),
- queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -541,8 +682,8 @@ public class TestQueryTranslator {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
- .collect(Collectors.toSet()),
- queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -609,8 +750,8 @@ public class TestQueryTranslator {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
- .collect(Collectors.toSet()),
- queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -676,8 +817,8 @@ public class TestQueryTranslator {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
- .collect(Collectors.toSet()),
- queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -706,8 +847,8 @@ public class TestQueryTranslator {
List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
- .collect(Collectors.toSet()),
- queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+ .collect(Collectors.toList()),
+ queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
http://git-wip-us.apache.org/repos/asf/samza/blob/ff607cb6/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 51cb1a9..3fc5750 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -138,7 +138,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
@Test
public void testEndToEndMultiSqlStmts() {
- int numMessages = 20;
+ int numMessages = 4;
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
String sql1 = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1";
@@ -205,7 +205,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
@Test
public void testEndToEndFanOut() {
- int numMessages = 20;
+ int numMessages = 4;
TestAvroSystemFactory.messages.clear();
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
String sql1 = "Insert into testavro.SIMPLE2 select * from testavro.SIMPLE1";