You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2018/11/28 23:53:41 UTC

samza git commit: SAMZA-2007: Samza-sql - Fix Samza-SQL to not expect LogicalTableModify in the Calcite plan.

Repository: samza
Updated Branches:
  refs/heads/master 5069f1ddb -> ff607cb6b


SAMZA-2007: Samza-sql - Fix Samza-SQL to not expect LogicalTableModify in the Calcite plan.

This is required for supporting schema evolution without failing the jobs.

Author: Aditya Toomula <at...@linkedin.com>

Reviewers: Srinivasulu Punuru <sp...@linkedin.com>

Closes #821 from atoomula/modify and squashes the following commits:

17b4b1c1 [Aditya Toomula] SAMZA-2007: Samza-sql - Fix Samza-SQL to not expect LogicalTableModify in the Calcite plan.
65be581a [Aditya Toomula] SAMZA-2007: Samza-sql - Fix Samza-SQL to not expect LogicalTableModify in the Calcite plan.
fb50ee81 [Aditya Toomula] SAMZA-2007: Samza-sql - Fix Samza-SQL to not expect LogicalTableModify in the Calcite plan.
9fff9573 [Aditya Toomula] SAMZA-2007: Samza-sql - Fix Samza-SQL to not expect LogicalTableModify in the Calcite plan.
f3e887c6 [Aditya Toomula] dummy


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ff607cb6
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ff607cb6
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ff607cb6

Branch: refs/heads/master
Commit: ff607cb6b61144ea6feb28842318e1eb1e5eaa5a
Parents: 5069f1d
Author: Aditya Toomula <at...@linkedin.com>
Authored: Wed Nov 28 15:53:30 2018 -0800
Committer: Srinivasulu Punuru <sp...@linkedin.com>
Committed: Wed Nov 28 15:53:30 2018 -0800

----------------------------------------------------------------------
 .../samza/sql/dsl/SamzaSqlDslConverter.java     |  16 +-
 .../samza/sql/runner/SamzaSqlApplication.java   |  10 +-
 .../sql/runner/SamzaSqlApplicationConfig.java   |  60 ++---
 .../sql/runner/SamzaSqlApplicationRunner.java   |   7 +-
 .../samza/sql/translator/ModifyTranslator.java  | 127 -----------
 .../samza/sql/translator/QueryTranslator.java   |  52 ++---
 .../runner/TestSamzaSqlApplicationConfig.java   |  55 ++++-
 .../sql/translator/TestQueryTranslator.java     | 221 +++++++++++++++----
 .../test/samzasql/TestSamzaSqlEndToEnd.java     |   4 +-
 9 files changed, 295 insertions(+), 257 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/ff607cb6/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
index 6cce906..d4cb134 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/dsl/SamzaSqlDslConverter.java
@@ -55,22 +55,20 @@ public class SamzaSqlDslConverter implements DslConverter {
     List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
     SamzaSqlApplicationConfig sqlConfig = new SamzaSqlApplicationConfig(config,
         queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-            .collect(Collectors.toSet()),
-        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
 
     QueryPlanner planner =
-        new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getSystemStreamConfigsBySource(),
+        new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(),
             sqlConfig.getUdfMetadata());
 
     List<RelRoot> relRoots = new LinkedList<>();
     for (String sql: sqlStmts) {
-      // when sql is a query, we only pass the select query to the planner
+      // we always pass only select query to the planner for samza sql. The reason is that samza sql supports
+      // schema evolution where source and destination could up to an extent have independent schema evolution while
+      // calcite expects strict comformance of the destination schema with that of the fields in the select query.
       SamzaSqlQueryParser.QueryInfo qinfo = SamzaSqlQueryParser.parseQuery(sql);
-      if (qinfo.getSink().split("\\.")[0].equals(SamzaSqlApplicationConfig.SAMZA_SYSTEM_LOG)) {
-        sql = qinfo.getSelectQuery();
-      }
-
-      relRoots.add(planner.plan(sql));
+      relRoots.add(planner.plan(qinfo.getSelectQuery()));
     }
     return relRoots;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/ff607cb6/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
index b8bb190..868f5a2 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplication.java
@@ -21,10 +21,9 @@ package org.apache.samza.sql.runner;
 
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import org.apache.calcite.rel.RelRoot;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
@@ -52,8 +51,8 @@ public class SamzaSqlApplication implements StreamApplication {
       Map<Integer, TranslatorContext> translatorContextMap = new HashMap<>();
 
       // 1. Get Calcite plan
-      Set<String> inputSystemStreams = new HashSet<>();
-      Set<String> outputSystemStreams = new HashSet<>();
+      List<String> inputSystemStreams = new LinkedList<>();
+      List<String> outputSystemStreams = new LinkedList<>();
 
       Collection<RelRoot> relRoots =
           SamzaSqlApplicationConfig.populateSystemStreamsAndGetRelRoots(dslStmts, appDescriptor.getConfig(),
@@ -66,12 +65,13 @@ public class SamzaSqlApplication implements StreamApplication {
       // 3. Translate Calcite plan to Samza stream operators
       QueryTranslator queryTranslator = new QueryTranslator(appDescriptor, sqlConfig);
       SamzaSqlExecutionContext executionContext = new SamzaSqlExecutionContext(sqlConfig);
+      // QueryId implies the index of the query in multiple query statements scenario. It should always start with 0.
       int queryId = 0;
       for (RelRoot relRoot : relRoots) {
         LOG.info("Translating relRoot {} to samza stream graph with queryId {}", relRoot, queryId);
         TranslatorContext translatorContext = new TranslatorContext(appDescriptor, relRoot, executionContext);
         translatorContextMap.put(queryId, translatorContext);
-        queryTranslator.translate(relRoot, translatorContext, queryId);
+        queryTranslator.translate(relRoot, sqlConfig.getOutputSystemStreams().get(queryId), translatorContext, queryId);
         queryId++;
       }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/ff607cb6/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index 6e12c02..e608794 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -106,23 +107,33 @@ public class SamzaSqlApplicationConfig {
 
   private final Map<String, SqlIOConfig> inputSystemStreamConfigBySource;
   private final Map<String, SqlIOConfig> outputSystemStreamConfigsBySource;
-  private final Map<String, SqlIOConfig> systemStreamConfigsBySource;
+
+  // There could only be one output system stream per samza sql statement. The below list datastructure stores the
+  // output system streams in the order of SQL query statements. Please note that there could be duplicate entries
+  // in it during a fan-in scenario (e.g. two sql statements with two different input streams but same output stream).
+  private final List<String> outputSystemStreams;
 
   private final String metadataTopicPrefix;
   private final long windowDurationMs;
 
-  public SamzaSqlApplicationConfig(Config staticConfig, Set<String> inputSystemStreams,
-      Set<String> outputSystemStreams) {
+  public SamzaSqlApplicationConfig(Config staticConfig, List<String> inputSystemStreams,
+      List<String> outputSystemStreams) {
 
     ioResolver = createIOResolver(staticConfig);
 
-    inputSystemStreamConfigBySource = inputSystemStreams.stream()
+    this.outputSystemStreams = new LinkedList<>(outputSystemStreams);
+
+    // There could be duplicate streams across different queries. Let's dedupe them.
+    Set<String> inputSystemStreamSet = new HashSet<>(inputSystemStreams);
+    Set<String> outputSystemStreamSet = new HashSet<>(outputSystemStreams);
+
+    inputSystemStreamConfigBySource = inputSystemStreamSet.stream()
          .collect(Collectors.toMap(Function.identity(), src -> ioResolver.fetchSourceInfo(src)));
 
-    outputSystemStreamConfigsBySource = outputSystemStreams.stream()
+    outputSystemStreamConfigsBySource = outputSystemStreamSet.stream()
          .collect(Collectors.toMap(Function.identity(), x -> ioResolver.fetchSinkInfo(x)));
 
-    systemStreamConfigsBySource = new HashMap<>(inputSystemStreamConfigBySource);
+    Map<String, SqlIOConfig> systemStreamConfigsBySource = new HashMap<>(inputSystemStreamConfigBySource);
     systemStreamConfigsBySource.putAll(outputSystemStreamConfigsBySource);
 
     Set<SqlIOConfig> systemStreamConfigs = new HashSet<>(systemStreamConfigsBySource.values());
@@ -140,7 +151,7 @@ public class SamzaSqlApplicationConfig {
                     relSchemaProvidersBySource.get(x.getSource()), c))));
 
     samzaRelTableKeyConvertersBySource = systemStreamConfigs.stream()
-        .filter(config -> config.isRemoteTable())
+        .filter(SqlIOConfig::isRemoteTable)
         .collect(Collectors.toMap(SqlIOConfig::getSource,
             x -> initializePlugin("SamzaRelTableKeyConverter", x.getSamzaRelTableKeyConverterName(),
                 staticConfig, CFG_FMT_SAMZA_REL_TABLE_KEY_CONVERTER_DOMAIN,
@@ -152,13 +163,6 @@ public class SamzaSqlApplicationConfig {
     metadataTopicPrefix =
         staticConfig.get(CFG_METADATA_TOPIC_PREFIX, DEFAULT_METADATA_TOPIC_PREFIX);
     windowDurationMs = staticConfig.getLong(CFG_GROUPBY_WINDOW_DURATION_MS, DEFAULT_GROUPBY_WINDOW_DURATION_MS);
-
-    // remove the SqlIOConfigs of outputs whose system is "log" out of systemStreamConfigsBySource
-    outputSystemStreamConfigsBySource.forEach((k, v) -> {
-        if (k.split("\\.")[0].equals(SamzaSqlApplicationConfig.SAMZA_SYSTEM_LOG)) {
-            systemStreamConfigsBySource.remove(k);
-        }
-    });
   }
 
   public static <T> T initializePlugin(String pluginName, String plugin, Config staticConfig,
@@ -229,31 +233,35 @@ public class SamzaSqlApplicationConfig {
   }
 
   public static Collection<RelRoot> populateSystemStreamsAndGetRelRoots(List<String> dslStmts, Config config,
-      Set<String> inputSystemStreams, Set<String> outputSystemStreams) {
+      List<String> inputSystemStreams, List<String> outputSystemStreams) {
     // TODO: Get the converter factory based on the file type. Create abstraction around this.
     DslConverterFactory dslConverterFactory = new SamzaSqlDslConverterFactory();
     DslConverter dslConverter = dslConverterFactory.create(config);
 
     Collection<RelRoot> relRoots = dslConverter.convertDsl(String.join("\n", dslStmts));
 
-    // FIXME: the snippet below dose not work when sql is a query
+    // RelRoot does not have sink node for Samza SQL dsl, so we can not traverse the relRoot tree to get
+    // "outputSystemStreams"
+    // FIXME: the snippet below does not work for Samza SQL dsl but is required for other dsls. Future fix could be
+    // for samza sql to build TableModify for sink and stick it to the relRoot, so we could get output stream out of it.
+
     // for (RelRoot relRoot : relRoots) {
     //   SamzaSqlApplicationConfig.populateSystemStreams(relRoot.project(), inputSystemStreams, outputSystemStreams);
     // }
 
-    // RelRoot does not have sink node (aka. log.outputStream) when Sql statement is a query, so we
-    // can not traverse the tree of relRoot to get "outputSystemStreams"
+    // The below code is specific to Samza SQL dsl and should be removed once Samza SQL includes sink as part of
+    // relRoot and the above code in uncommented.
     List<String> sqlStmts = SamzaSqlDslConverter.fetchSqlFromConfig(config);
     List<SamzaSqlQueryParser.QueryInfo> queryInfo = SamzaSqlDslConverter.fetchQueryInfo(sqlStmts);
     inputSystemStreams.addAll(queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-          .collect(Collectors.toSet()));
-    outputSystemStreams.addAll(queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+          .collect(Collectors.toList()));
+    outputSystemStreams.addAll(queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
 
     return relRoots;
   }
 
-  private static void populateSystemStreams(RelNode relNode, Set<String> inputSystemStreams,
-      Set<String> outputSystemStreams) {
+  private static void populateSystemStreams(RelNode relNode, List<String> inputSystemStreams,
+      List<String> outputSystemStreams) {
     if (relNode instanceof TableModify) {
       outputSystemStreams.add(getSystemStreamName(relNode));
     } else {
@@ -282,6 +290,10 @@ public class SamzaSqlApplicationConfig {
     return udfMetadata;
   }
 
+  public List<String> getOutputSystemStreams() {
+    return outputSystemStreams;
+  }
+
   public Map<String, SqlIOConfig> getInputSystemStreamConfigBySource() {
     return inputSystemStreamConfigBySource;
   }
@@ -290,10 +302,6 @@ public class SamzaSqlApplicationConfig {
     return outputSystemStreamConfigsBySource;
   }
 
-  public Map<String, SqlIOConfig> getSystemStreamConfigsBySource() {
-    return systemStreamConfigsBySource;
-  }
-
   public Map<String, SamzaRelConverter> getSamzaRelConverters() {
     return samzaRelConvertersBySource;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/ff607cb6/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
index 41e37f1..d9a44ec 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
@@ -21,10 +21,9 @@ package org.apache.samza.sql.runner;
 
 import java.time.Duration;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import org.apache.commons.lang3.Validate;
 import org.apache.samza.application.SamzaApplication;
 import org.apache.samza.config.Config;
@@ -83,8 +82,8 @@ public class SamzaSqlApplicationRunner implements ApplicationRunner {
     String sqlJson = SamzaSqlApplicationConfig.serializeSqlStmts(dslStmts);
     newConfig.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, sqlJson);
 
-    Set<String> inputSystemStreams = new HashSet<>();
-    Set<String> outputSystemStreams = new HashSet<>();
+    List<String> inputSystemStreams = new LinkedList<>();
+    List<String> outputSystemStreams = new LinkedList<>();
 
     SamzaSqlApplicationConfig.populateSystemStreamsAndGetRelRoots(dslStmts, config, inputSystemStreams,
         outputSystemStreams);

http://git-wip-us.apache.org/repos/asf/samza/blob/ff607cb6/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
deleted file mode 100644
index dbeabab..0000000
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*   http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-
-package org.apache.samza.sql.translator;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import org.apache.calcite.rel.core.TableModify;
-import org.apache.commons.lang.Validate;
-import org.apache.samza.SamzaException;
-import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
-import org.apache.samza.context.Context;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.system.descriptors.GenericOutputDescriptor;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.table.descriptors.TableDescriptor;
-import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
-import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.sql.data.SamzaSqlRelMessage;
-import org.apache.samza.sql.interfaces.SamzaRelConverter;
-import org.apache.samza.sql.interfaces.SqlIOConfig;
-import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
-import org.apache.samza.table.Table;
-
-
-/**
- * Translator to translate the TableModify in relational graph to the corresponding output streams in the StreamGraph
- * implementation
- */
-class ModifyTranslator {
-
-  private final Map<String, SamzaRelConverter> relMsgConverters;
-  private final Map<String, SqlIOConfig> systemStreamConfig;
-  private final int queryId;
-
-  ModifyTranslator(Map<String, SamzaRelConverter> converters, Map<String, SqlIOConfig> ssc, int queryId) {
-    relMsgConverters = converters;
-    this.systemStreamConfig = ssc;
-    this.queryId = queryId;
-  }
-
-  // OutputMapFunction converts SamzaSqlRelMessage to SamzaMessage in KV format
-  private static class OutputMapFunction implements MapFunction<SamzaSqlRelMessage, KV<Object, Object>> {
-    // All the user-supplied functions are expected to be serializable in order to enable full serialization of user
-    // DAG. We do not want to serialize samzaMsgConverter as it can be fully constructed during stream operator
-    // initialization.
-    private transient SamzaRelConverter samzaMsgConverter;
-    private final String outputTopic;
-    private final int queryId;
-
-    OutputMapFunction(String outputTopic, int queryId) {
-      this.outputTopic = outputTopic;
-      this.queryId = queryId;
-    }
-
-    @Override
-    public void init(Context context) {
-      TranslatorContext translatorContext =
-          ((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
-      this.samzaMsgConverter = translatorContext.getMsgConverter(outputTopic);
-    }
-
-    @Override
-    public KV<Object, Object> apply(SamzaSqlRelMessage message) {
-      return this.samzaMsgConverter.convertToSamzaMessage(message);
-    }
-  }
-
-  void translate(final TableModify tableModify, final TranslatorContext context, Map<String, DelegatingSystemDescriptor> systemDescriptors,
-      Map<String, OutputStream> outputMsgStreams) {
-
-    StreamApplicationDescriptor streamAppDesc = context.getStreamAppDescriptor();
-    List<String> tableNameParts = tableModify.getTable().getQualifiedName();
-    String targetName = SqlIOConfig.getSourceFromSourceParts(tableNameParts);
-
-    Validate.isTrue(relMsgConverters.containsKey(targetName), String.format("Unknown source %s", targetName));
-
-    SqlIOConfig sinkConfig = systemStreamConfig.get(targetName);
-
-    final String systemName = sinkConfig.getSystemName();
-    final String streamId = sinkConfig.getStreamId();
-    final String source = sinkConfig.getSource();
-
-    KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
-    DelegatingSystemDescriptor
-        sd = systemDescriptors.computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
-    GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(streamId, noOpKVSerde);
-
-    MessageStreamImpl<SamzaSqlRelMessage> stream =
-        (MessageStreamImpl<SamzaSqlRelMessage>) context.getMessageStream(tableModify.getInput().getId());
-    MessageStream<KV<Object, Object>> outputStream = stream.map(new OutputMapFunction(targetName, queryId));
-
-    Optional<TableDescriptor> tableDescriptor = sinkConfig.getTableDescriptor();
-    if (!tableDescriptor.isPresent()) {
-      OutputStream stm = outputMsgStreams.computeIfAbsent(source, v -> streamAppDesc.getOutputStream(osd));
-      outputStream.sendTo(stm);
-    } else {
-      Table outputTable = streamAppDesc.getTable(tableDescriptor.get());
-      if (outputTable == null) {
-        String msg = "Failed to obtain table descriptor of " + sinkConfig.getSource();
-        throw new SamzaException(msg);
-      }
-      outputStream.sendTo(outputTable);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/ff607cb6/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index bb34a41..fef5471 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -32,6 +32,7 @@ import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.LogicalProject;
+import org.apache.commons.lang.Validate;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.context.Context;
@@ -71,7 +72,6 @@ public class QueryTranslator {
     private transient SamzaRelConverter samzaMsgConverter;
     private final String outputTopic;
     private final int queryId;
-    static OutputStream logOutputStream;
 
     OutputMapFunction(String outputTopic, int queryId) {
       this.outputTopic = outputTopic;
@@ -103,14 +103,14 @@ public class QueryTranslator {
    * For unit testing only
    */
   @VisibleForTesting
-  public void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamApplicationDescriptor appDesc, int queryId) {
+  void translate(SamzaSqlQueryParser.QueryInfo queryInfo, StreamApplicationDescriptor appDesc, int queryId) {
     QueryPlanner planner =
-        new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getSystemStreamConfigsBySource(),
+        new QueryPlanner(sqlConfig.getRelSchemaProviders(), sqlConfig.getInputSystemStreamConfigBySource(),
             sqlConfig.getUdfMetadata());
-    final RelRoot relRoot = planner.plan(queryInfo.getSql());
+    final RelRoot relRoot = planner.plan(queryInfo.getSelectQuery());
     SamzaSqlExecutionContext executionContext = new SamzaSqlExecutionContext(sqlConfig);
     TranslatorContext translatorContext = new TranslatorContext(appDesc, relRoot, executionContext);
-    translate(relRoot, translatorContext, queryId);
+    translate(relRoot, sqlConfig.getOutputSystemStreams().get(queryId), translatorContext, queryId);
     Map<Integer, TranslatorContext> translatorContexts = new HashMap<>();
     translatorContexts.put(queryId, translatorContext.clone());
     appDesc.withApplicationTaskContextFactory((jobContext,
@@ -120,12 +120,19 @@ public class QueryTranslator {
         new SamzaSqlApplicationContext(translatorContexts));
   }
 
-  public void translate(RelRoot relRoot, TranslatorContext translatorContext, int queryId) {
+  /**
+   * Translate Calcite plan to Samza stream operators.
+   * @param relRoot Calcite plan in the form of {@link RelRoot}. RelRoot should not include the sink ({@link TableModify})
+   * @param outputSystemStream Sink associated with the Calcite plan.
+   * @param translatorContext Context maintained across translations.
+   * @param queryId query index of the sql statement corresponding to the Calcite plan in multi SQL statement scenario
+   *                starting with index 0.
+   */
+  public void translate(RelRoot relRoot, String outputSystemStream, TranslatorContext translatorContext, int queryId) {
     final RelNode node = relRoot.project();
+
     ScanTranslator scanTranslator =
         new ScanTranslator(sqlConfig.getSamzaRelConverters(), sqlConfig.getInputSystemStreamConfigBySource(), queryId);
-    ModifyTranslator modifyTranslator =
-        new ModifyTranslator(sqlConfig.getSamzaRelConverters(), sqlConfig.getOutputSystemStreamConfigsBySource(), queryId);
 
     node.accept(new RelShuttleImpl() {
       int windowId = 0;
@@ -134,21 +141,11 @@ public class QueryTranslator {
 
       @Override
       public RelNode visit(RelNode relNode) {
-        if (relNode instanceof TableModify) {
-          return visit((TableModify) relNode);
-        }
+        // There should never be a TableModify in the calcite plan.
+        Validate.isTrue(!(relNode instanceof TableModify));
         return super.visit(relNode);
       }
 
-      private RelNode visit(TableModify modify) {
-        if (!modify.isInsert()) {
-          throw new SamzaException("Not a supported operation: " + modify.toString());
-        }
-        RelNode node = super.visit(modify);
-        modifyTranslator.translate(modify, translatorContext, systemDescriptors, outputMsgStreams);
-        return node;
-      }
-
       @Override
       public RelNode visit(TableScan scan) {
         RelNode node = super.visit(scan);
@@ -190,14 +187,7 @@ public class QueryTranslator {
       }
     });
 
-    // the snippet below will be performed only when sql is a query statement
-    sqlConfig.getOutputSystemStreamConfigsBySource().keySet().forEach(
-        key -> {
-          if (key.split("\\.")[0].equals(SamzaSqlApplicationConfig.SAMZA_SYSTEM_LOG)) {
-            sendToOutputStream(key, streamAppDescriptor, translatorContext, node, queryId);
-          }
-        }
-    );
+    sendToOutputStream(outputSystemStream, streamAppDescriptor, translatorContext, node, queryId);
   }
 
   private void sendToOutputStream(String sinkStream, StreamApplicationDescriptor appDesc, TranslatorContext context, RelNode node, int queryId) {
@@ -211,10 +201,8 @@ public class QueryTranslator {
       DelegatingSystemDescriptor
           sd = systemDescriptors.computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
       GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(sinkConfig.getStreamId(), noOpKVSerde);
-      if (OutputMapFunction.logOutputStream == null) {
-        OutputMapFunction.logOutputStream = appDesc.getOutputStream(osd);
-      }
-      outputStream.sendTo(OutputMapFunction.logOutputStream);
+      OutputStream stm = outputMsgStreams.computeIfAbsent(sinkConfig.getSource(), v -> appDesc.getOutputStream(osd));
+      outputStream.sendTo(stm);
     } else {
       Table outputTable = appDesc.getTable(tableDescriptor.get());
       if (outputTable == null) {

http://git-wip-us.apache.org/repos/asf/samza/blob/ff607cb6/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
index 46c0651..294eccd 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
@@ -37,7 +37,6 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import static org.apache.samza.sql.dsl.SamzaSqlDslConverter.*;
-import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.*;
 
 
 public class TestSamzaSqlApplicationConfig {
@@ -53,8 +52,8 @@ public class TestSamzaSqlApplicationConfig {
     List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
         queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-            .collect(Collectors.toSet()),
-        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
 
     Assert.assertEquals(numUdfs, samzaSqlApplicationConfig.getUdfMetadata().size());
     Assert.assertEquals(1, samzaSqlApplicationConfig.getInputSystemStreamConfigBySource().size());
@@ -80,8 +79,8 @@ public class TestSamzaSqlApplicationConfig {
     List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
     new SamzaSqlApplicationConfig(new MapConfig(config),
         queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-            .collect(Collectors.toSet()),
-        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
 
     testWithoutConfigShouldFail(config, SamzaSqlApplicationConfig.CFG_IO_RESOLVER);
     testWithoutConfigShouldFail(config, SamzaSqlApplicationConfig.CFG_UDF_RESOLVER);
@@ -98,7 +97,7 @@ public class TestSamzaSqlApplicationConfig {
   }
 
   @Test
-  public void testGetInputAndOutputStreamConfigs() {
+  public void testGetInputAndOutputStreamConfigsFanOut() {
     List<String> sqlStmts = Arrays.asList("Insert into testavro.COMPLEX1 select * from testavro.SIMPLE1",
         "insert into testavro.Profile select * from testavro.SIMPLE1");
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
@@ -107,16 +106,48 @@ public class TestSamzaSqlApplicationConfig {
     List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
         queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-            .collect(Collectors.toSet()),
-        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
 
     Set<String> inputKeys = samzaSqlApplicationConfig.getInputSystemStreamConfigBySource().keySet();
     Set<String> outputKeys = samzaSqlApplicationConfig.getOutputSystemStreamConfigsBySource().keySet();
+    List<String> outputStreamList = samzaSqlApplicationConfig.getOutputSystemStreams();
+
     Assert.assertEquals(1, inputKeys.size());
     Assert.assertTrue(inputKeys.contains("testavro.SIMPLE1"));
     Assert.assertEquals(2, outputKeys.size());
     Assert.assertTrue(outputKeys.contains("testavro.COMPLEX1"));
     Assert.assertTrue(outputKeys.contains("testavro.Profile"));
+    Assert.assertEquals(2, outputStreamList.size());
+    Assert.assertEquals("testavro.COMPLEX1", outputStreamList.get(0));
+    Assert.assertEquals("testavro.Profile", outputStreamList.get(1));
+  }
+
+  @Test
+  public void testGetInputAndOutputStreamConfigsFanIn() {
+    List<String> sqlStmts = Arrays.asList("Insert into testavro.COMPLEX1 select * from testavro.SIMPLE1",
+        "insert into testavro.COMPLEX1 select * from testavro.SIMPLE2");
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
+
+    Set<String> inputKeys = samzaSqlApplicationConfig.getInputSystemStreamConfigBySource().keySet();
+    Set<String> outputKeys = samzaSqlApplicationConfig.getOutputSystemStreamConfigsBySource().keySet();
+    List<String> outputStreamList = samzaSqlApplicationConfig.getOutputSystemStreams();
+
+    Assert.assertEquals(2, inputKeys.size());
+    Assert.assertTrue(inputKeys.contains("testavro.SIMPLE1"));
+    Assert.assertTrue(inputKeys.contains("testavro.SIMPLE2"));
+    Assert.assertEquals(1, outputKeys.size());
+    Assert.assertTrue(outputKeys.contains("testavro.COMPLEX1"));
+    Assert.assertEquals(2, outputStreamList.size());
+    Assert.assertEquals("testavro.COMPLEX1", outputStreamList.get(0));
+    Assert.assertEquals("testavro.COMPLEX1", outputStreamList.get(1));
   }
 
   private void testWithoutConfigShouldPass(Map<String, String> config, String configKey) {
@@ -126,8 +157,8 @@ public class TestSamzaSqlApplicationConfig {
     List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
     new SamzaSqlApplicationConfig(new MapConfig(badConfigs),
         queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-            .collect(Collectors.toSet()),
-        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
   }
 
   private void testWithoutConfigShouldFail(Map<String, String> config, String configKey) {
@@ -138,8 +169,8 @@ public class TestSamzaSqlApplicationConfig {
       List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
       new SamzaSqlApplicationConfig(new MapConfig(badConfigs),
           queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-              .collect(Collectors.toSet()),
-          queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+              .collect(Collectors.toList()),
+          queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
       Assert.fail();
     } catch (IllegalArgumentException e) {
       // swallow

http://git-wip-us.apache.org/repos/asf/samza/blob/ff607cb6/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
index 0c9091d..d9039ec 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.sql.translator;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -34,6 +35,7 @@ import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory;
 import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationRunner;
+import org.apache.samza.sql.testutil.JsonUtil;
 import org.apache.samza.sql.testutil.SamzaSqlQueryParser;
 import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
 import org.junit.Assert;
@@ -65,8 +67,8 @@ public class TestQueryTranslator {
     List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
         queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-            .collect(Collectors.toSet()),
-        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
 
     StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(streamApp -> { },samzaConfig);
     QueryTranslator translator = new QueryTranslator(appDesc, samzaSqlApplicationConfig);
@@ -92,6 +94,145 @@ public class TestQueryTranslator {
   }
 
   @Test
+  public void testTranslateFanIn() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
+    String sql1 = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE2";
+    String sql2 = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1";
+    List<String> sqlStmts = Arrays.asList(sql1, sql2);
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
+
+    StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(streamApp -> { },samzaConfig);
+    QueryTranslator translator = new QueryTranslator(appDesc, samzaSqlApplicationConfig);
+
+    translator.translate(queryInfo.get(0), appDesc, 0);
+    translator.translate(queryInfo.get(1), appDesc, 1);
+    OperatorSpecGraph specGraph = appDesc.getOperatorSpecGraph();
+
+    StreamConfig streamConfig = new StreamConfig(samzaConfig);
+    String inputStreamId1 = specGraph.getInputOperators().keySet().stream().findFirst().get();
+    String inputSystem1 = streamConfig.getSystem(inputStreamId1);
+    String inputPhysicalName1 = streamConfig.getPhysicalName(inputStreamId1);
+    String inputStreamId2 = specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get();
+    String inputSystem2 = streamConfig.getSystem(inputStreamId2);
+    String inputPhysicalName2 = streamConfig.getPhysicalName(inputStreamId2);
+
+    String outputStreamId = specGraph.getOutputStreams().keySet().stream().findFirst().get();
+    String outputSystem = streamConfig.getSystem(outputStreamId);
+    String outputPhysicalName = streamConfig.getPhysicalName(outputStreamId);
+
+    Assert.assertEquals(1, specGraph.getOutputStreams().size());
+    Assert.assertEquals("testavro", outputSystem);
+    Assert.assertEquals("simpleOutputTopic", outputPhysicalName);
+
+    Assert.assertEquals(2, specGraph.getInputOperators().size());
+    Assert.assertEquals("testavro", inputSystem1);
+    Assert.assertEquals("SIMPLE2", inputPhysicalName1);
+    Assert.assertEquals("testavro", inputSystem2);
+    Assert.assertEquals("SIMPLE1", inputPhysicalName2);
+  }
+
+  @Test
+  public void testTranslateFanOut() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
+    String sql1 = "Insert into testavro.SIMPLE2 select * from testavro.SIMPLE1";
+    String sql2 = "Insert into testavro.SIMPLE3 select * from testavro.SIMPLE1";
+    List<String> sqlStmts = Arrays.asList(sql1, sql2);
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
+
+    StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(streamApp -> { },samzaConfig);
+    QueryTranslator translator = new QueryTranslator(appDesc, samzaSqlApplicationConfig);
+
+    translator.translate(queryInfo.get(0), appDesc, 0);
+    translator.translate(queryInfo.get(1), appDesc, 1);
+    OperatorSpecGraph specGraph = appDesc.getOperatorSpecGraph();
+
+    StreamConfig streamConfig = new StreamConfig(samzaConfig);
+    String inputStreamId = specGraph.getInputOperators().keySet().stream().findFirst().get();
+    String inputSystem = streamConfig.getSystem(inputStreamId);
+    String inputPhysicalName = streamConfig.getPhysicalName(inputStreamId);
+    String outputStreamId1 = specGraph.getOutputStreams().keySet().stream().findFirst().get();
+    String outputSystem1 = streamConfig.getSystem(outputStreamId1);
+    String outputPhysicalName1 = streamConfig.getPhysicalName(outputStreamId1);
+    String outputStreamId2 = specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get();
+    String outputSystem2 = streamConfig.getSystem(outputStreamId2);
+    String outputPhysicalName2 = streamConfig.getPhysicalName(outputStreamId2);
+
+    Assert.assertEquals(2, specGraph.getOutputStreams().size());
+    Assert.assertEquals("testavro", outputSystem1);
+    Assert.assertEquals("SIMPLE2", outputPhysicalName1);
+    Assert.assertEquals("testavro", outputSystem2);
+    Assert.assertEquals("SIMPLE3", outputPhysicalName2);
+
+    Assert.assertEquals(1, specGraph.getInputOperators().size());
+    Assert.assertEquals("testavro", inputSystem);
+    Assert.assertEquals("SIMPLE1", inputPhysicalName);
+  }
+
+  @Test
+  public void testTranslateMultiSql() {
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
+    String sql1 = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1";
+    String sql2 = "Insert into testavro.SIMPLE3 select * from testavro.SIMPLE2";
+    List<String> sqlStmts = Arrays.asList(sql1, sql2);
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
+
+    List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
+
+    StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(streamApp -> { },samzaConfig);
+    QueryTranslator translator = new QueryTranslator(appDesc, samzaSqlApplicationConfig);
+
+    translator.translate(queryInfo.get(0), appDesc, 0);
+    translator.translate(queryInfo.get(1), appDesc, 1);
+    OperatorSpecGraph specGraph = appDesc.getOperatorSpecGraph();
+
+    StreamConfig streamConfig = new StreamConfig(samzaConfig);
+    String inputStreamId1 = specGraph.getInputOperators().keySet().stream().findFirst().get();
+    String inputSystem1 = streamConfig.getSystem(inputStreamId1);
+    String inputPhysicalName1 = streamConfig.getPhysicalName(inputStreamId1);
+    String inputStreamId2 = specGraph.getInputOperators().keySet().stream().skip(1).findFirst().get();
+    String inputSystem2 = streamConfig.getSystem(inputStreamId2);
+    String inputPhysicalName2 = streamConfig.getPhysicalName(inputStreamId2);
+
+    String outputStreamId1 = specGraph.getOutputStreams().keySet().stream().findFirst().get();
+    String outputSystem1 = streamConfig.getSystem(outputStreamId1);
+    String outputPhysicalName1 = streamConfig.getPhysicalName(outputStreamId1);
+    String outputStreamId2 = specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get();
+    String outputSystem2 = streamConfig.getSystem(outputStreamId2);
+    String outputPhysicalName2 = streamConfig.getPhysicalName(outputStreamId2);
+
+    Assert.assertEquals(2, specGraph.getOutputStreams().size());
+    Assert.assertEquals("testavro", outputSystem1);
+    Assert.assertEquals("simpleOutputTopic", outputPhysicalName1);
+    Assert.assertEquals("testavro", outputSystem2);
+    Assert.assertEquals("SIMPLE3", outputPhysicalName2);
+
+    Assert.assertEquals(2, specGraph.getInputOperators().size());
+    Assert.assertEquals("testavro", inputSystem1);
+    Assert.assertEquals("SIMPLE1", inputPhysicalName1);
+    Assert.assertEquals("testavro", inputSystem2);
+    Assert.assertEquals("SIMPLE2", inputPhysicalName2);
+  }
+
+  @Test
   public void testTranslateComplex() {
     Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT,
@@ -102,8 +243,8 @@ public class TestQueryTranslator {
     List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
         queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-            .collect(Collectors.toSet()),
-        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
 
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
     QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -139,8 +280,8 @@ public class TestQueryTranslator {
     List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
         queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-            .collect(Collectors.toSet()),
-        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
 
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
     QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -179,8 +320,8 @@ public class TestQueryTranslator {
     List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
         queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-            .collect(Collectors.toSet()),
-        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
 
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
     QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -204,8 +345,8 @@ public class TestQueryTranslator {
     List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
         queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-            .collect(Collectors.toSet()),
-        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
 
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
     QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -229,8 +370,8 @@ public class TestQueryTranslator {
     List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
         queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-            .collect(Collectors.toSet()),
-        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
 
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
     QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -254,8 +395,8 @@ public class TestQueryTranslator {
     List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
         queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-            .collect(Collectors.toSet()),
-        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
 
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
     QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -276,8 +417,8 @@ public class TestQueryTranslator {
     List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
         queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-            .collect(Collectors.toSet()),
-        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
 
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
     QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -300,8 +441,8 @@ public class TestQueryTranslator {
     List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
         queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-            .collect(Collectors.toSet()),
-        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
 
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
     QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -325,8 +466,8 @@ public class TestQueryTranslator {
     List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
         queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-            .collect(Collectors.toSet()),
-        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
 
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
     QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -349,8 +490,8 @@ public class TestQueryTranslator {
     List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
         queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-            .collect(Collectors.toSet()),
-        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
 
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
     QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -373,8 +514,8 @@ public class TestQueryTranslator {
     List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
         queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-            .collect(Collectors.toSet()),
-        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
 
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
     QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -397,8 +538,8 @@ public class TestQueryTranslator {
     List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
         queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-            .collect(Collectors.toSet()),
-        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
 
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
     QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -421,8 +562,8 @@ public class TestQueryTranslator {
     List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
         queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-            .collect(Collectors.toSet()),
-        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
 
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
     QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -449,8 +590,8 @@ public class TestQueryTranslator {
     List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
         queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-            .collect(Collectors.toSet()),
-        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
 
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
     QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -474,8 +615,8 @@ public class TestQueryTranslator {
     List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
         queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-            .collect(Collectors.toSet()),
-        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
 
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
     QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -541,8 +682,8 @@ public class TestQueryTranslator {
     List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
         queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-            .collect(Collectors.toSet()),
-        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
 
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
     QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -609,8 +750,8 @@ public class TestQueryTranslator {
     List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
         queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-            .collect(Collectors.toSet()),
-        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
 
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
     QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -676,8 +817,8 @@ public class TestQueryTranslator {
     List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
         queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-            .collect(Collectors.toSet()),
-        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
 
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
     QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);
@@ -706,8 +847,8 @@ public class TestQueryTranslator {
     List<SamzaSqlQueryParser.QueryInfo> queryInfo = fetchQueryInfo(sqlStmts);
     SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config),
         queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSources).flatMap(Collection::stream)
-            .collect(Collectors.toSet()),
-        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toSet()));
+            .collect(Collectors.toList()),
+        queryInfo.stream().map(SamzaSqlQueryParser.QueryInfo::getSink).collect(Collectors.toList()));
 
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(streamApp -> { }, samzaConfig);
     QueryTranslator translator = new QueryTranslator(streamAppDesc, samzaSqlApplicationConfig);

http://git-wip-us.apache.org/repos/asf/samza/blob/ff607cb6/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 51cb1a9..3fc5750 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -138,7 +138,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
 
   @Test
   public void testEndToEndMultiSqlStmts() {
-    int numMessages = 20;
+    int numMessages = 4;
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
     String sql1 = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1";
@@ -205,7 +205,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
 
   @Test
   public void testEndToEndFanOut() {
-    int numMessages = 20;
+    int numMessages = 4;
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
     String sql1 = "Insert into testavro.SIMPLE2 select * from testavro.SIMPLE1";