You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2018/11/12 19:42:57 UTC

samza git commit: SAMZA-1986: Samza-sql: Use system name along with stream name for streamId

Repository: samza
Updated Branches:
  refs/heads/master 6dd3e1823 -> 1d6f693c8


SAMZA-1986: Samza-sql: Use system name along with stream name for streamId

Author: Aditya Toomula <at...@linkedin.com>

Reviewers: Srinivasulu Punuru <sp...@linkedin.com>

Closes #800 from atoomula/streamid


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1d6f693c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1d6f693c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1d6f693c

Branch: refs/heads/master
Commit: 1d6f693c84c7476d0d51a3a47e41908f7ddd1646
Parents: 6dd3e18
Author: Aditya Toomula <at...@linkedin.com>
Authored: Mon Nov 12 11:42:51 2018 -0800
Committer: Srinivasulu Punuru <sp...@linkedin.com>
Committed: Mon Nov 12 11:42:51 2018 -0800

----------------------------------------------------------------------
 .../samza/sql/interfaces/SqlIOConfig.java       | 29 +++++++------
 .../sql/runner/SamzaSqlApplicationRunner.java   |  4 +-
 .../samza/sql/translator/ModifyTranslator.java  |  4 +-
 .../samza/sql/translator/QueryTranslator.java   |  2 +-
 .../samza/sql/translator/ScanTranslator.java    |  4 +-
 .../runner/TestSamzaSqlApplicationRunner.java   |  8 ++--
 .../samza/sql/system/TestAvroSystemFactory.java | 34 ++++++++++-----
 .../samza/sql/testutil/SamzaSqlTestConfig.java  | 23 ++++++++++
 .../test/samzasql/TestSamzaSqlEndToEnd.java     | 45 ++++++++++++++++++++
 9 files changed, 119 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/1d6f693c/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
index 3ef1795..5fa30e7 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
@@ -43,9 +43,7 @@ public class SqlIOConfig {
   public static final String CFG_SAMZA_REL_TABLE_KEY_CONVERTER = "samzaRelTableKeyConverterName";
   public static final String CFG_REL_SCHEMA_PROVIDER = "relSchemaProviderName";
 
-  private final String systemName;
-
-  private final String streamName;
+  private final String streamId;
 
   private final String samzaRelConverterName;
   private final String samzaRelTableKeyConverterName;
@@ -71,13 +69,16 @@ public class SqlIOConfig {
   public SqlIOConfig(String systemName, String streamName, List<String> sourceParts,
       Config systemConfig, TableDescriptor tableDescriptor) {
     HashMap<String, String> streamConfigs = new HashMap<>(systemConfig);
-    this.systemName = systemName;
-    this.streamName = streamName;
     this.source = getSourceFromSourceParts(sourceParts);
     this.sourceParts = sourceParts;
     this.systemStream = new SystemStream(systemName, streamName);
     this.tableDescriptor = Optional.ofNullable(tableDescriptor);
 
+    // Remote table has no backing stream associated with it and hence streamId does not make sense. But let's keep it
+    // for uniformity. Remote table has table descriptor defined.
+    // Local table has both backing stream and a tableDescriptor defined.
+    this.streamId = String.format("%s-%s", systemName, streamName);
+
     samzaRelConverterName = streamConfigs.get(CFG_SAMZA_REL_CONVERTER);
     Validate.notEmpty(samzaRelConverterName,
         String.format("%s is not set or empty for system %s", CFG_SAMZA_REL_CONVERTER, systemName));
@@ -96,10 +97,14 @@ public class SqlIOConfig {
     streamConfigs.remove(CFG_SAMZA_REL_CONVERTER);
     streamConfigs.remove(CFG_REL_SCHEMA_PROVIDER);
 
-    // Currently, only local table is supported. And it is assumed that all tables are local tables.
-    if (tableDescriptor != null) {
-      streamConfigs.put(String.format(StreamConfig.BOOTSTRAP_FOR_STREAM_ID(), streamName), "true");
-      streamConfigs.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(), streamName), "oldest");
+    if (!isRemoteTable()) {
+      // The below config is required for local table and streams but not for remote table.
+      streamConfigs.put(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), streamId), streamName);
+      if (tableDescriptor != null) {
+        // For local table, set the bootstrap config and default offset to oldest
+        streamConfigs.put(String.format(StreamConfig.BOOTSTRAP_FOR_STREAM_ID(), streamId), "true");
+        streamConfigs.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(), streamId), "oldest");
+      }
     }
 
     config = new MapConfig(streamConfigs);
@@ -114,11 +119,11 @@ public class SqlIOConfig {
   }
 
   public String getSystemName() {
-    return systemName;
+    return systemStream.getSystem();
   }
 
-  public String getStreamName() {
-    return streamName;
+  public String getStreamId() {
+    return streamId;
   }
 
   public String getSamzaRelConverterName() {

http://git-wip-us.apache.org/repos/asf/samza/blob/1d6f693c/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
index 9d361fb..41e37f1 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
@@ -94,14 +94,14 @@ public class SamzaSqlApplicationRunner implements ApplicationRunner {
     // Populate stream to system mapping config for input and output system streams
     for (String source : inputSystemStreams) {
       SqlIOConfig inputSystemStreamConfig = ioResolver.fetchSourceInfo(source);
-      newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, inputSystemStreamConfig.getStreamName()),
+      newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, inputSystemStreamConfig.getStreamId()),
           inputSystemStreamConfig.getSystemName());
       newConfig.putAll(inputSystemStreamConfig.getConfig());
     }
 
     for (String sink : outputSystemStreams) {
       SqlIOConfig outputSystemStreamConfig = ioResolver.fetchSinkInfo(sink);
-      newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, outputSystemStreamConfig.getStreamName()),
+      newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, outputSystemStreamConfig.getStreamId()),
           outputSystemStreamConfig.getSystemName());
       newConfig.putAll(outputSystemStreamConfig.getConfig());
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/1d6f693c/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
index b26f4a7..dbeabab 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
@@ -99,13 +99,13 @@ class ModifyTranslator {
     SqlIOConfig sinkConfig = systemStreamConfig.get(targetName);
 
     final String systemName = sinkConfig.getSystemName();
-    final String streamName = sinkConfig.getStreamName();
+    final String streamId = sinkConfig.getStreamId();
     final String source = sinkConfig.getSource();
 
     KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
     DelegatingSystemDescriptor
         sd = systemDescriptors.computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
-    GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(streamName, noOpKVSerde);
+    GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(streamId, noOpKVSerde);
 
     MessageStreamImpl<SamzaSqlRelMessage> stream =
         (MessageStreamImpl<SamzaSqlRelMessage>) context.getMessageStream(tableModify.getInput().getId());

http://git-wip-us.apache.org/repos/asf/samza/blob/1d6f693c/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index 7f3c11e..a826f9f 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -209,7 +209,7 @@ public class QueryTranslator {
       String systemName = sinkConfig.getSystemName();
       DelegatingSystemDescriptor
           sd = systemDescriptors.computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
-      GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(sinkConfig.getStreamName(), noOpKVSerde);
+      GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(sinkConfig.getStreamId(), noOpKVSerde);
       if (OutputMapFunction.logOutputStream == null) {
         OutputMapFunction.logOutputStream = appDesc.getOutputStream(osd);
       }

http://git-wip-us.apache.org/repos/asf/samza/blob/1d6f693c/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index 2615aad..2a5a0e8 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -90,7 +90,7 @@ class ScanTranslator {
     Validate.isTrue(relMsgConverters.containsKey(sourceName), String.format("Unknown source %s", sourceName));
     SqlIOConfig sqlIOConfig = systemStreamConfig.get(sourceName);
     final String systemName = sqlIOConfig.getSystemName();
-    final String streamName = sqlIOConfig.getStreamName();
+    final String streamId = sqlIOConfig.getStreamId();
     final String source = sqlIOConfig.getSource();
 
     final boolean isRemoteTable = sqlIOConfig.getTableDescriptor().isPresent() &&
@@ -107,7 +107,7 @@ class ScanTranslator {
     KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
     DelegatingSystemDescriptor
         sd = systemDescriptors.computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
-    GenericInputDescriptor<KV<Object, Object>> isd = sd.getInputDescriptor(streamName, noOpKVSerde);
+    GenericInputDescriptor<KV<Object, Object>> isd = sd.getInputDescriptor(streamId, noOpKVSerde);
 
     MessageStream<KV<Object, Object>> inputStream = inputMsgStreams.computeIfAbsent(source, v -> streamAppDesc.getInputStream(isd));
     MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream = inputStream.map(new ScanMapFunction(sourceName, queryId));

http://git-wip-us.apache.org/repos/asf/samza/blob/1d6f693c/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
index f0df3a9..ccab449 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
@@ -40,13 +40,13 @@ public class TestSamzaSqlApplicationRunner {
     MapConfig samzaConfig = new MapConfig(configs);
     Config newConfigs = SamzaSqlApplicationRunner.computeSamzaConfigs(true, samzaConfig);
     Assert.assertEquals(newConfigs.get(SamzaSqlApplicationRunner.RUNNER_CONFIG), LocalApplicationRunner.class.getName());
-    // Check whether three new configs added.
-    Assert.assertEquals(newConfigs.size(), configs.size() + 3);
+    // Check whether five new configs added.
+    Assert.assertEquals(newConfigs.size(), configs.size() + 5);
 
     newConfigs = SamzaSqlApplicationRunner.computeSamzaConfigs(false, samzaConfig);
     Assert.assertEquals(newConfigs.get(SamzaSqlApplicationRunner.RUNNER_CONFIG), RemoteApplicationRunner.class.getName());
 
-    // Check whether three new configs added.
-    Assert.assertEquals(newConfigs.size(), configs.size() + 3);
+    // Check whether five new configs added.
+    Assert.assertEquals(newConfigs.size(), configs.size() + 5);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1d6f693c/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
index 9a9e269..f7d13a4 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
@@ -49,6 +49,7 @@ import org.apache.samza.system.SystemConsumer;
 import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -164,17 +165,28 @@ public class TestAvroSystemFactory implements SystemFactory {
 
     @Override
     public void register(SystemStreamPartition systemStreamPartition, String offset) {
-      if (systemStreamPartition.getStream().toLowerCase().contains("simple1")) {
-        simpleRecordSsps.add(systemStreamPartition);
-      }
-      if (systemStreamPartition.getStream().toLowerCase().contains("profile")) {
-        profileRecordSsps.add(systemStreamPartition);
-      }
-      if (systemStreamPartition.getStream().toLowerCase().contains("company")) {
-        companyRecordSsps.add(systemStreamPartition);
-      }
-      if (systemStreamPartition.getStream().toLowerCase().contains("pageview")) {
-        pageViewRecordSsps.add(systemStreamPartition);
+      switch (systemStreamPartition.getStream().toLowerCase()) {
+        case "simple1":
+          simpleRecordSsps.add(systemStreamPartition);
+          break;
+        case "profile":
+          profileRecordSsps.add(systemStreamPartition);
+          break;
+        case "company":
+          companyRecordSsps.add(systemStreamPartition);
+          break;
+        case "pageview":
+          pageViewRecordSsps.add(systemStreamPartition);
+          break;
+        case "complex1":
+          break;
+        case "simple2":
+          break;
+        case "simple3":
+          break;
+        default:
+          Assert.assertTrue(String.format("ssp %s is not recognized", systemStreamPartition), false);
+          break;
       }
       curMessagesPerSsp.put(systemStreamPartition, 0);
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/1d6f693c/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
index 80b47eb..f9f4124 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
@@ -53,6 +53,8 @@ import static org.apache.samza.sql.testutil.RemoteStoreIOResolverTestFactory.TES
 public class SamzaSqlTestConfig {
 
   public static final String SAMZA_SYSTEM_TEST_AVRO = "testavro";
+  public static final String SAMZA_SYSTEM_TEST_AVRO2 = "testavro2";
+  public static final String SAMZA_SYSTEM_TEST_DB = "testDb";
 
   public static Map<String, String> fetchStaticConfigsWithFactories(int numberOfMessages) {
     return fetchStaticConfigsWithFactories(new HashMap<>(), numberOfMessages, false);
@@ -109,6 +111,24 @@ public class SamzaSqlTestConfig {
     staticConfigs.put(testRemoteStoreSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_TABLE_KEY_CONVERTER, "sample");
     staticConfigs.put(testRemoteStoreSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
 
+    String avro2SystemConfigPrefix =
+            String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_TEST_AVRO2);
+    String avro2SamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_TEST_AVRO2);
+    staticConfigs.put(avro2SystemConfigPrefix + "samza.factory", TestAvroSystemFactory.class.getName());
+    staticConfigs.put(avro2SystemConfigPrefix + TestAvroSystemFactory.CFG_NUM_MESSAGES,
+            String.valueOf(numberOfMessages));
+    staticConfigs.put(avro2SystemConfigPrefix + TestAvroSystemFactory.CFG_INCLUDE_NULL_FOREIGN_KEYS,
+            includeNullForeignKeys ? "true" : "false");
+    staticConfigs.put(avro2SystemConfigPrefix + TestAvroSystemFactory.CFG_SLEEP_BETWEEN_POLLS_MS,
+            String.valueOf(windowDurationMs / 2));
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_GROUPBY_WINDOW_DURATION_MS, String.valueOf(windowDurationMs));
+    staticConfigs.put(avro2SamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
+    staticConfigs.put(avro2SamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+
+    String testDbSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_TEST_DB);
+    staticConfigs.put(testDbSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
+    staticConfigs.put(testDbSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+
     String avroSamzaToRelMsgConverterDomain =
         String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "avro");
     staticConfigs.put(avroSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
@@ -133,6 +153,9 @@ public class SamzaSqlTestConfig {
         "testavro", "SIMPLE1"), SimpleRecord.SCHEMA$.toString());
 
     staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+        "testavro2", "SIMPLE1"), SimpleRecord.SCHEMA$.toString());
+
+    staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
         "testavro", "SIMPLE2"), SimpleRecord.SCHEMA$.toString());
 
     staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,

http://git-wip-us.apache.org/repos/asf/samza/blob/1d6f693c/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 77538ff..3f2148c 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -90,6 +90,26 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
   }
 
   @Test
+  public void testEndToEndWithDifferentSystemSameStream() {
+    int numMessages = 20;
+
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    String sql = "Insert into testavro2.SIMPLE1 select * from testavro.SIMPLE1";
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+    runner.runAndWaitForFinish();
+
+    List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString()))
+        .sorted()
+        .collect(Collectors.toList());
+    Assert.assertEquals(numMessages, outMessages.size());
+    Assert.assertTrue(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()).equals(outMessages));
+  }
+
+  @Test
   public void testEndToEndMultiSqlStmts() {
     int numMessages = 20;
     TestAvroSystemFactory.messages.clear();
@@ -110,6 +130,31 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
     Assert.assertTrue(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()).equals(new ArrayList<>(outMessagesSet)));
   }
 
+  // The below test won't work until SAMZA-1990 is fixed. Currently, Samza framework does not allow same system stream
+  // to be used as both input and output stream.
+  @Ignore
+  @Test
+  public void testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput() {
+    int numMessages = 20;
+    TestAvroSystemFactory.messages.clear();
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+    String sql1 = "Insert into testavro.SIMPLE1 select * from testavro.SIMPLE2";
+    String sql2 = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1";
+    List<String> sqlStmts = Arrays.asList(sql1, sql2);
+
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+    runner.runAndWaitForFinish();
+    List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString()))
+        .sorted()
+        .collect(Collectors.toList());
+    Assert.assertEquals(numMessages * 2, outMessages.size());
+    Set<Integer> outMessagesSet = new HashSet<>(outMessages);
+    Assert.assertEquals(numMessages, outMessagesSet.size());
+    Assert.assertTrue(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()).equals(new ArrayList<>(outMessagesSet)));
+  }
+
   @Test
   public void testEndToEndFanIn() {
     int numMessages = 20;