You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2018/10/25 21:16:46 UTC

samza git commit: SAMZA-1927: Samza-sql - always repartition the stream denoted by stream-table join.

Repository: samza
Updated Branches:
  refs/heads/master 43a501b49 -> 58f18117c


SAMZA-1927: Samza-sql - always repartition the stream denoted by stream-table join.

Author: Aditya Toomula <at...@linkedin.com>

Reviewers: Srinivasulu Punuru <sp...@linkedin.com>

Closes #676 from atoomula/dsl3 and squashes the following commits:

e86ef83c [Aditya Toomula] Adding metadatastream prefix config. This will be used to reset both the intermediate streams and changelogstore streams by changing the prefix name.
14450264 [Aditya Toomula] Adding metadatastream prefix config. This will be used to reset both the intermediate streams and changelogstore streams by changing the prefix name.
c3289673 [Aditya Toomula] Adding changelogstreamname prefix config. This will be used to reset the state by changing the prefix name.
804b07a1 [Aditya Toomula] SAMZA-1927: Samza-sql - always repartition the stream denoted by stream-table join.


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/58f18117
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/58f18117
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/58f18117

Branch: refs/heads/master
Commit: 58f18117ccddc62cf4c74325faa8dfa6ad9ec8e3
Parents: 43a501b
Author: Aditya Toomula <at...@linkedin.com>
Authored: Thu Oct 25 14:16:24 2018 -0700
Committer: Srinivasulu Punuru <sp...@linkedin.com>
Committed: Thu Oct 25 14:16:24 2018 -0700

----------------------------------------------------------------------
 .../sql/impl/ConfigBasedIOResolverFactory.java  | 13 +++-
 .../sql/runner/SamzaSqlApplicationConfig.java   | 20 +++++-
 .../samza/sql/translator/JoinTranslator.java    | 18 +++++-
 .../translator/LogicalAggregateTranslator.java  |  7 +-
 .../samza/sql/translator/QueryTranslator.java   |  6 +-
 .../sql/testutil/TestIOResolverFactory.java     | 12 +++-
 .../sql/translator/TestJoinTranslator.java      |  2 +-
 .../sql/translator/TestQueryTranslator.java     | 67 ++++++++++++++------
 8 files changed, 111 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/58f18117/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
index 7faff17..2514d30 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
@@ -25,14 +25,17 @@ import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.sql.data.SamzaSqlCompositeKey;
-import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SqlIOResolver;
 import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
 import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.CFG_METADATA_TOPIC_PREFIX;
+import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.DEFAULT_METADATA_TOPIC_PREFIX;
+
 
 /**
  * Source Resolver implementation that uses static config to return a config corresponding to a system stream.
@@ -54,9 +57,12 @@ public class ConfigBasedIOResolverFactory implements SqlIOResolverFactory {
   private class ConfigBasedIOResolver implements SqlIOResolver {
     private final String SAMZA_SQL_QUERY_TABLE_KEYWORD = "$table";
     private final Config config;
+    private final String changeLogStorePrefix;
 
     public ConfigBasedIOResolver(Config config) {
       this.config = config;
+      String metadataTopicPrefix = config.get(CFG_METADATA_TOPIC_PREFIX, DEFAULT_METADATA_TOPIC_PREFIX);
+      this.changeLogStorePrefix = metadataTopicPrefix + (metadataTopicPrefix.isEmpty() ? "" : "_");
     }
 
     @Override
@@ -100,9 +106,10 @@ public class ConfigBasedIOResolverFactory implements SqlIOResolverFactory {
 
       TableDescriptor tableDescriptor = null;
       if (isTable) {
-        tableDescriptor = new RocksDbTableDescriptor("InputTable-" + name, KVSerde.of(
+        String tableId = changeLogStorePrefix + "InputTable-" + name.replace(".", "-").replace("$", "-");
+        tableDescriptor = new RocksDbTableDescriptor(tableId, KVSerde.of(
             new JsonSerdeV2<>(SamzaSqlCompositeKey.class),
-            new JsonSerdeV2<>(SamzaSqlRelMessage.class)));
+            new SamzaSqlRelMessageSerdeFactory().getSerde(null, null))).withChangelogEnabled();
       }
 
       return new SqlIOConfig(systemName, streamName, fetchSystemConfigs(systemName), tableDescriptor);

http://git-wip-us.apache.org/repos/asf/samza/blob/58f18117/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index 745c934..dcb5043 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.sql.runner;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -82,10 +83,13 @@ public class SamzaSqlApplicationConfig {
   public static final String CFG_UDF_RESOLVER = "samza.sql.udfResolver";
   public static final String CFG_FMT_UDF_RESOLVER_DOMAIN = "samza.sql.udfResolver.%s.";
 
+  public static final String CFG_METADATA_TOPIC_PREFIX = "samza.sql.metadataTopicPrefix";
   public static final String CFG_GROUPBY_WINDOW_DURATION_MS = "samza.sql.groupby.window.ms";
 
   public static final String SAMZA_SYSTEM_LOG = "log";
 
+  public static final String DEFAULT_METADATA_TOPIC_PREFIX = "";
+
   private static final long DEFAULT_GROUPBY_WINDOW_DURATION_MS = 300000; // default groupby window duration is 5 mins.
 
   private final Map<String, RelSchemaProvider> relSchemaProvidersBySource;
@@ -100,6 +104,7 @@ public class SamzaSqlApplicationConfig {
   private final Map<String, SqlIOConfig> outputSystemStreamConfigsBySource;
   private final Map<String, SqlIOConfig> systemStreamConfigsBySource;
 
+  private final String metadataTopicPrefix;
   private final long windowDurationMs;
 
   public SamzaSqlApplicationConfig(Config staticConfig, Set<String> inputSystemStreams,
@@ -133,6 +138,8 @@ public class SamzaSqlApplicationConfig {
     udfResolver = createUdfResolver(staticConfig);
     udfMetadata = udfResolver.getUdfs();
 
+    metadataTopicPrefix =
+        staticConfig.get(CFG_METADATA_TOPIC_PREFIX, DEFAULT_METADATA_TOPIC_PREFIX);
     windowDurationMs = staticConfig.getLong(CFG_GROUPBY_WINDOW_DURATION_MS, DEFAULT_GROUPBY_WINDOW_DURATION_MS);
 
     // remove the SqlIOConfigs of outputs whose system is "log" out of systemStreamConfigsBySource
@@ -168,9 +175,14 @@ public class SamzaSqlApplicationConfig {
 
   public static SqlIOResolver createIOResolver(Config config) {
     String sourceResolveValue = config.get(CFG_IO_RESOLVER);
+    Map<String, String> metadataPrefixProperties = new HashMap<>();
+    metadataPrefixProperties.put(
+        String.format(CFG_FMT_SOURCE_RESOLVER_DOMAIN, sourceResolveValue) + CFG_METADATA_TOPIC_PREFIX,
+        config.get(CFG_METADATA_TOPIC_PREFIX, DEFAULT_METADATA_TOPIC_PREFIX));
+    Config newConfig = new MapConfig(Arrays.asList(config, metadataPrefixProperties));
     Validate.notEmpty(sourceResolveValue, "ioResolver config is not set or empty");
-    return initializePlugin("SqlIOResolver", sourceResolveValue, config, CFG_FMT_SOURCE_RESOLVER_DOMAIN,
-        (o, c) -> ((SqlIOResolverFactory) o).create(c, config));
+    return initializePlugin("SqlIOResolver", sourceResolveValue, newConfig, CFG_FMT_SOURCE_RESOLVER_DOMAIN,
+        (o, c) -> ((SqlIOResolverFactory) o).create(c, newConfig));
   }
 
   private UdfResolver createUdfResolver(Map<String, String> config) {
@@ -283,6 +295,10 @@ public class SamzaSqlApplicationConfig {
     return ioResolver;
   }
 
+  public String getMetadataTopicPrefix() {
+    return metadataTopicPrefix;
+  }
+
   public long getWindowDurationMs() {
     return windowDurationMs;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/58f18117/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index ac2c64d..0761898 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -76,10 +76,12 @@ class JoinTranslator {
   private static final Logger log = LoggerFactory.getLogger(JoinTranslator.class);
   private int joinId;
   private SqlIOResolver ioResolver;
+  private final String intermediateStreamPrefix;
 
-  JoinTranslator(int joinId, SqlIOResolver ioResolver) {
+  JoinTranslator(int joinId, SqlIOResolver ioResolver, String intermediateStreamPrefix) {
     this.joinId = joinId;
     this.ioResolver = ioResolver;
+    this.intermediateStreamPrefix = intermediateStreamPrefix + (intermediateStreamPrefix.isEmpty() ? "" : "_");
   }
 
   void translate(final LogicalJoin join, final TranslatorContext context) {
@@ -124,7 +126,7 @@ class JoinTranslator {
             .partitionBy(m -> createSamzaSqlCompositeKey(m, streamKeyIds),
                 m -> m,
                 KVSerde.of(keySerde, valueSerde),
-                "stream_" + joinId)
+                intermediateStreamPrefix + "stream_" + joinId)
             .map(KV::getValue)
             .join(table, joinFn);
     // MessageStream<SamzaSqlRelMessage> outputStream = inputStream.join(table, joinFn);
@@ -288,8 +290,18 @@ class JoinTranslator {
     Table<KV<SamzaSqlCompositeKey, SamzaSqlRelMessage>> table =
         context.getStreamAppDescriptor().getTable(sourceConfig.getTableDescriptor().get());
 
+    Serde<SamzaSqlCompositeKey> keySerde = new JsonSerdeV2<>(SamzaSqlCompositeKey.class);
+    SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde =
+        (SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
+
+    // Let's always repartition by the join fields as key before sending the key and value to the table.
+    // We need to repartition the stream denoted as table to ensure that both the stream and table that are joined
+    // have the same partitioning scheme and partition key.
     relOutputStream
-        .map(m -> new KV(createSamzaSqlCompositeKey(m, tableKeyIds), m))
+        .partitionBy(m -> createSamzaSqlCompositeKey(m, tableKeyIds),
+            m -> m,
+            KVSerde.of(keySerde, valueSerde),
+            intermediateStreamPrefix + "table_" + joinId)
         .sendTo(table);
 
     return table;

http://git-wip-us.apache.org/repos/asf/samza/blob/58f18117/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
index 216ecea..40a08ff 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
@@ -45,9 +45,11 @@ class LogicalAggregateTranslator {
 
   private static final Logger log = LoggerFactory.getLogger(JoinTranslator.class);
   private int windowId;
+  private String changeLogStorePrefix;
 
-  LogicalAggregateTranslator(int windowId) {
+  LogicalAggregateTranslator(int windowId, String changeLogStorePrefix) {
     this.windowId = windowId;
+    this.changeLogStorePrefix = changeLogStorePrefix + (changeLogStorePrefix.isEmpty() ? "" : "_");
   }
 
   void translate(final LogicalAggregate aggregate, final TranslatorContext context) {
@@ -69,7 +71,8 @@ class LogicalAggregateTranslator {
                 foldCountFn,
                 new SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde(),
                 new LongSerde())
-                .setAccumulationMode(AccumulationMode.DISCARDING), "tumblingWindow_" + windowId)
+                .setAccumulationMode(
+                    AccumulationMode.DISCARDING), changeLogStorePrefix + "_tumblingWindow_" + windowId)
             .map(windowPane -> {
                 List<String> fieldNames = windowPane.getKey().getKey().getSamzaSqlRelRecord().getFieldNames();
                 List<Object> fieldValues = windowPane.getKey().getKey().getSamzaSqlRelRecord().getFieldValues();

http://git-wip-us.apache.org/repos/asf/samza/blob/58f18117/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index a1e7adb..4c5f11c 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -177,7 +177,8 @@ public class QueryTranslator {
       public RelNode visit(LogicalJoin join) {
         RelNode node = super.visit(join);
         joinId++;
-        new JoinTranslator(joinId, ioResolver).translate(join, translatorContext);
+        new JoinTranslator(joinId, ioResolver, sqlConfig.getMetadataTopicPrefix())
+            .translate(join, translatorContext);
         return node;
       }
 
@@ -185,7 +186,8 @@ public class QueryTranslator {
       public RelNode visit(LogicalAggregate aggregate) {
         RelNode node = super.visit(aggregate);
         windowId++;
-        new LogicalAggregateTranslator(windowId).translate(aggregate, translatorContext);
+        new LogicalAggregateTranslator(windowId, sqlConfig.getMetadataTopicPrefix())
+            .translate(aggregate, translatorContext);
         return node;
       }
     });

http://git-wip-us.apache.org/repos/asf/samza/blob/58f18117/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
index 14314c8..818e33d 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
@@ -32,10 +32,10 @@ import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.sql.data.SamzaSqlCompositeKey;
-import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.interfaces.SqlIOResolver;
 import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
+import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
 import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.Table;
@@ -44,6 +44,9 @@ import org.apache.samza.table.descriptors.TableProviderFactory;
 import org.apache.samza.table.TableSpec;
 import org.apache.samza.table.utils.descriptors.BaseTableProvider;
 
+import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.CFG_METADATA_TOPIC_PREFIX;
+import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.DEFAULT_METADATA_TOPIC_PREFIX;
+
 
 public class TestIOResolverFactory implements SqlIOResolverFactory {
   public static final String TEST_DB_SYSTEM = "testDb";
@@ -179,9 +182,12 @@ public class TestIOResolverFactory implements SqlIOResolverFactory {
     private final String SAMZA_SQL_QUERY_TABLE_KEYWORD = "$table";
     private final Config config;
     private final Map<String, TableDescriptor> tableDescMap = new HashMap<>();
+    private final String changeLogStorePrefix;
 
     public TestIOResolver(Config config) {
       this.config = config;
+      String metadataTopicPrefix = config.get(CFG_METADATA_TOPIC_PREFIX, DEFAULT_METADATA_TOPIC_PREFIX);
+      this.changeLogStorePrefix = metadataTopicPrefix + (metadataTopicPrefix.isEmpty() ? "" : "_");
     }
 
     private SqlIOConfig fetchIOInfo(String ioName, boolean isSink) {
@@ -200,10 +206,10 @@ public class TestIOResolverFactory implements SqlIOResolverFactory {
           if (isSink) {
             tableDescriptor = new TestTableDescriptor(TEST_TABLE_ID + tableDescMap.size());
           } else {
-            String tableId = "InputTable-" + ioName.replace(".", "-").replace("$", "-");
+            String tableId = changeLogStorePrefix + "InputTable-" + ioName.replace(".", "-").replace("$", "-");
             tableDescriptor = new RocksDbTableDescriptor(tableId, KVSerde.of(
                 new JsonSerdeV2<>(SamzaSqlCompositeKey.class),
-                new JsonSerdeV2<>(SamzaSqlRelMessage.class)));
+                new SamzaSqlRelMessageSerdeFactory().getSerde(null, null))).withChangelogEnabled();
           }
           tableDescMap.put(ioName, tableDescriptor);
         }

http://git-wip-us.apache.org/repos/asf/samza/blob/58f18117/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
index dcd7023..33c6d02 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
@@ -164,7 +164,7 @@ public class TestJoinTranslator extends TranslatorTestBase {
     when(mockIOConfig.getTableDescriptor()).thenReturn(Optional.of(mockTableDesc));
 
     // Apply translate() method to verify that we are getting the correct map operator constructed
-    JoinTranslator joinTranslator = new JoinTranslator(3, mockResolver);
+    JoinTranslator joinTranslator = new JoinTranslator(3, mockResolver, "");
     joinTranslator.translate(mockJoin, mockContext);
     // make sure that context has been registered with LogicFilter and output message streams
     verify(mockContext, times(1)).registerMessageStream(3, this.getRegisteredMessageStream(3));

http://git-wip-us.apache.org/repos/asf/samza/blob/58f18117/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
index d0dc23f..250253e 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
@@ -495,6 +495,7 @@ public class TestQueryTranslator {
             + " join testavro.PROFILE.`$table` as p"
             + " on p.id = pv.profileId";
     config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+    config.put(SamzaSqlApplicationConfig.CFG_METADATA_TOPIC_PREFIX, "sampleAppv1");
     Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
 
     List<String> sqlStmts = fetchSqlFromConfig(config);
@@ -520,26 +521,36 @@ public class TestQueryTranslator {
     String input3StreamId = specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get();
     String input3System = streamConfig.getSystem(input3StreamId);
     String input3PhysicalName = streamConfig.getPhysicalName(input3StreamId);
+    String input4StreamId = specGraph.getInputOperators().keySet().stream().skip(3).findFirst().get();
+    String input4System = streamConfig.getSystem(input4StreamId);
+    String input4PhysicalName = streamConfig.getPhysicalName(input4StreamId);
     String output1StreamId = specGraph.getOutputStreams().keySet().stream().findFirst().get();
     String output1System = streamConfig.getSystem(output1StreamId);
     String output1PhysicalName = streamConfig.getPhysicalName(output1StreamId);
     String output2StreamId = specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get();
     String output2System = streamConfig.getSystem(output2StreamId);
     String output2PhysicalName = streamConfig.getPhysicalName(output2StreamId);
+    String output3StreamId = specGraph.getOutputStreams().keySet().stream().skip(2).findFirst().get();
+    String output3System = streamConfig.getSystem(output3StreamId);
+    String output3PhysicalName = streamConfig.getPhysicalName(output3StreamId);
 
-    Assert.assertEquals(2, specGraph.getOutputStreams().size());
+    Assert.assertEquals(3, specGraph.getOutputStreams().size());
     Assert.assertEquals("kafka", output1System);
-    Assert.assertEquals("sql-job-1-partition_by-stream_1", output1PhysicalName);
-    Assert.assertEquals("testavro", output2System);
-    Assert.assertEquals("enrichedPageViewTopic", output2PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_1", output1PhysicalName);
+    Assert.assertEquals("kafka", output2System);
+    Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_1", output2PhysicalName);
+    Assert.assertEquals("testavro", output3System);
+    Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName);
 
-    Assert.assertEquals(3, specGraph.getInputOperators().size());
+    Assert.assertEquals(4, specGraph.getInputOperators().size());
     Assert.assertEquals("testavro", input1System);
     Assert.assertEquals("PAGEVIEW", input1PhysicalName);
     Assert.assertEquals("testavro", input2System);
     Assert.assertEquals("PROFILE", input2PhysicalName);
     Assert.assertEquals("kafka", input3System);
-    Assert.assertEquals("sql-job-1-partition_by-stream_1", input3PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_1", input3PhysicalName);
+    Assert.assertEquals("kafka", input4System);
+    Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_1", input4PhysicalName);
   }
 
   @Test
@@ -578,26 +589,36 @@ public class TestQueryTranslator {
     String input3StreamId = specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get();
     String input3System = streamConfig.getSystem(input3StreamId);
     String input3PhysicalName = streamConfig.getPhysicalName(input3StreamId);
+    String input4StreamId = specGraph.getInputOperators().keySet().stream().skip(3).findFirst().get();
+    String input4System = streamConfig.getSystem(input4StreamId);
+    String input4PhysicalName = streamConfig.getPhysicalName(input4StreamId);
     String output1StreamId = specGraph.getOutputStreams().keySet().stream().findFirst().get();
     String output1System = streamConfig.getSystem(output1StreamId);
     String output1PhysicalName = streamConfig.getPhysicalName(output1StreamId);
     String output2StreamId = specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get();
     String output2System = streamConfig.getSystem(output2StreamId);
     String output2PhysicalName = streamConfig.getPhysicalName(output2StreamId);
+    String output3StreamId = specGraph.getOutputStreams().keySet().stream().skip(2).findFirst().get();
+    String output3System = streamConfig.getSystem(output3StreamId);
+    String output3PhysicalName = streamConfig.getPhysicalName(output3StreamId);
 
-    Assert.assertEquals(2, specGraph.getOutputStreams().size());
+    Assert.assertEquals(3, specGraph.getOutputStreams().size());
     Assert.assertEquals("kafka", output1System);
-    Assert.assertEquals("sql-job-1-partition_by-stream_1", output1PhysicalName);
-    Assert.assertEquals("testavro", output2System);
-    Assert.assertEquals("enrichedPageViewTopic", output2PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-table_1", output1PhysicalName);
+    Assert.assertEquals("kafka", output2System);
+    Assert.assertEquals("sql-job-1-partition_by-stream_1", output2PhysicalName);
+    Assert.assertEquals("testavro", output3System);
+    Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName);
 
-    Assert.assertEquals(3, specGraph.getInputOperators().size());
+    Assert.assertEquals(4, specGraph.getInputOperators().size());
     Assert.assertEquals("testavro", input1System);
     Assert.assertEquals("PAGEVIEW", input1PhysicalName);
     Assert.assertEquals("testavro", input2System);
     Assert.assertEquals("PROFILE", input2PhysicalName);
     Assert.assertEquals("kafka", input3System);
-    Assert.assertEquals("sql-job-1-partition_by-stream_1", input3PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-table_1", input3PhysicalName);
+    Assert.assertEquals("kafka", input4System);
+    Assert.assertEquals("sql-job-1-partition_by-stream_1", input4PhysicalName);
   }
 
   @Test
@@ -635,26 +656,36 @@ public class TestQueryTranslator {
     String input3StreamId = specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get();
     String input3System = streamConfig.getSystem(input3StreamId);
     String input3PhysicalName = streamConfig.getPhysicalName(input3StreamId);
+    String input4StreamId = specGraph.getInputOperators().keySet().stream().skip(3).findFirst().get();
+    String input4System = streamConfig.getSystem(input4StreamId);
+    String input4PhysicalName = streamConfig.getPhysicalName(input4StreamId);
     String output1StreamId = specGraph.getOutputStreams().keySet().stream().findFirst().get();
     String output1System = streamConfig.getSystem(output1StreamId);
     String output1PhysicalName = streamConfig.getPhysicalName(output1StreamId);
     String output2StreamId = specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get();
     String output2System = streamConfig.getSystem(output2StreamId);
     String output2PhysicalName = streamConfig.getPhysicalName(output2StreamId);
+    String output3StreamId = specGraph.getOutputStreams().keySet().stream().skip(2).findFirst().get();
+    String output3System = streamConfig.getSystem(output3StreamId);
+    String output3PhysicalName = streamConfig.getPhysicalName(output3StreamId);
 
-    Assert.assertEquals(2, specGraph.getOutputStreams().size());
+    Assert.assertEquals(3, specGraph.getOutputStreams().size());
     Assert.assertEquals("kafka", output1System);
-    Assert.assertEquals("sql-job-1-partition_by-stream_1", output1PhysicalName);
-    Assert.assertEquals("testavro", output2System);
-    Assert.assertEquals("enrichedPageViewTopic", output2PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-table_1", output1PhysicalName);
+    Assert.assertEquals("kafka", output2System);
+    Assert.assertEquals("sql-job-1-partition_by-stream_1", output2PhysicalName);
+    Assert.assertEquals("testavro", output3System);
+    Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName);
 
-    Assert.assertEquals(3, specGraph.getInputOperators().size());
+    Assert.assertEquals(4, specGraph.getInputOperators().size());
     Assert.assertEquals("testavro", input1System);
     Assert.assertEquals("PROFILE", input1PhysicalName);
     Assert.assertEquals("testavro", input2System);
     Assert.assertEquals("PAGEVIEW", input2PhysicalName);
     Assert.assertEquals("kafka", input3System);
-    Assert.assertEquals("sql-job-1-partition_by-stream_1", input3PhysicalName);
+    Assert.assertEquals("sql-job-1-partition_by-table_1", input3PhysicalName);
+    Assert.assertEquals("kafka", input4System);
+    Assert.assertEquals("sql-job-1-partition_by-stream_1", input4PhysicalName);
   }
 
   @Test