You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2018/10/25 21:16:46 UTC
samza git commit: SAMZA-1927: Samza-sql - always repartition the
stream denoted by stream-table join.
Repository: samza
Updated Branches:
refs/heads/master 43a501b49 -> 58f18117c
SAMZA-1927: Samza-sql - always repartition the stream denoted by stream-table join.
Author: Aditya Toomula <at...@linkedin.com>
Reviewers: Srinivasulu Punuru <sp...@linkedin.com>
Closes #676 from atoomula/dsl3 and squashes the following commits:
e86ef83c [Aditya Toomula] Adding metadatastream prefix config. This will be used to reset both the intermediate streams and changelogstore streams by changing the prefix name.
14450264 [Aditya Toomula] Adding metadatastream prefix config. This will be used to reset both the intermediate streams and changelogstore streams by changing the prefix name.
c3289673 [Aditya Toomula] Adding changelogstreamname prefix config. This will be used to reset the state by changing the prefix name.
804b07a1 [Aditya Toomula] SAMZA-1927: Samza-sql - always repartition the stream denoted by stream-table join.
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/58f18117
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/58f18117
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/58f18117
Branch: refs/heads/master
Commit: 58f18117ccddc62cf4c74325faa8dfa6ad9ec8e3
Parents: 43a501b
Author: Aditya Toomula <at...@linkedin.com>
Authored: Thu Oct 25 14:16:24 2018 -0700
Committer: Srinivasulu Punuru <sp...@linkedin.com>
Committed: Thu Oct 25 14:16:24 2018 -0700
----------------------------------------------------------------------
.../sql/impl/ConfigBasedIOResolverFactory.java | 13 +++-
.../sql/runner/SamzaSqlApplicationConfig.java | 20 +++++-
.../samza/sql/translator/JoinTranslator.java | 18 +++++-
.../translator/LogicalAggregateTranslator.java | 7 +-
.../samza/sql/translator/QueryTranslator.java | 6 +-
.../sql/testutil/TestIOResolverFactory.java | 12 +++-
.../sql/translator/TestJoinTranslator.java | 2 +-
.../sql/translator/TestQueryTranslator.java | 67 ++++++++++++++------
8 files changed, 111 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/58f18117/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
index 7faff17..2514d30 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/impl/ConfigBasedIOResolverFactory.java
@@ -25,14 +25,17 @@ import org.apache.samza.table.descriptors.TableDescriptor;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.sql.data.SamzaSqlCompositeKey;
-import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.SqlIOResolver;
import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
import org.apache.samza.sql.interfaces.SqlIOConfig;
+import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.CFG_METADATA_TOPIC_PREFIX;
+import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.DEFAULT_METADATA_TOPIC_PREFIX;
+
/**
* Source Resolver implementation that uses static config to return a config corresponding to a system stream.
@@ -54,9 +57,12 @@ public class ConfigBasedIOResolverFactory implements SqlIOResolverFactory {
private class ConfigBasedIOResolver implements SqlIOResolver {
private final String SAMZA_SQL_QUERY_TABLE_KEYWORD = "$table";
private final Config config;
+ private final String changeLogStorePrefix;
public ConfigBasedIOResolver(Config config) {
this.config = config;
+ String metadataTopicPrefix = config.get(CFG_METADATA_TOPIC_PREFIX, DEFAULT_METADATA_TOPIC_PREFIX);
+ this.changeLogStorePrefix = metadataTopicPrefix + (metadataTopicPrefix.isEmpty() ? "" : "_");
}
@Override
@@ -100,9 +106,10 @@ public class ConfigBasedIOResolverFactory implements SqlIOResolverFactory {
TableDescriptor tableDescriptor = null;
if (isTable) {
- tableDescriptor = new RocksDbTableDescriptor("InputTable-" + name, KVSerde.of(
+ String tableId = changeLogStorePrefix + "InputTable-" + name.replace(".", "-").replace("$", "-");
+ tableDescriptor = new RocksDbTableDescriptor(tableId, KVSerde.of(
new JsonSerdeV2<>(SamzaSqlCompositeKey.class),
- new JsonSerdeV2<>(SamzaSqlRelMessage.class)));
+ new SamzaSqlRelMessageSerdeFactory().getSerde(null, null))).withChangelogEnabled();
}
return new SqlIOConfig(systemName, streamName, fetchSystemConfigs(systemName), tableDescriptor);
http://git-wip-us.apache.org/repos/asf/samza/blob/58f18117/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index 745c934..dcb5043 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -19,6 +19,7 @@
package org.apache.samza.sql.runner;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -82,10 +83,13 @@ public class SamzaSqlApplicationConfig {
public static final String CFG_UDF_RESOLVER = "samza.sql.udfResolver";
public static final String CFG_FMT_UDF_RESOLVER_DOMAIN = "samza.sql.udfResolver.%s.";
+ public static final String CFG_METADATA_TOPIC_PREFIX = "samza.sql.metadataTopicPrefix";
public static final String CFG_GROUPBY_WINDOW_DURATION_MS = "samza.sql.groupby.window.ms";
public static final String SAMZA_SYSTEM_LOG = "log";
+ public static final String DEFAULT_METADATA_TOPIC_PREFIX = "";
+
private static final long DEFAULT_GROUPBY_WINDOW_DURATION_MS = 300000; // default groupby window duration is 5 mins.
private final Map<String, RelSchemaProvider> relSchemaProvidersBySource;
@@ -100,6 +104,7 @@ public class SamzaSqlApplicationConfig {
private final Map<String, SqlIOConfig> outputSystemStreamConfigsBySource;
private final Map<String, SqlIOConfig> systemStreamConfigsBySource;
+ private final String metadataTopicPrefix;
private final long windowDurationMs;
public SamzaSqlApplicationConfig(Config staticConfig, Set<String> inputSystemStreams,
@@ -133,6 +138,8 @@ public class SamzaSqlApplicationConfig {
udfResolver = createUdfResolver(staticConfig);
udfMetadata = udfResolver.getUdfs();
+ metadataTopicPrefix =
+ staticConfig.get(CFG_METADATA_TOPIC_PREFIX, DEFAULT_METADATA_TOPIC_PREFIX);
windowDurationMs = staticConfig.getLong(CFG_GROUPBY_WINDOW_DURATION_MS, DEFAULT_GROUPBY_WINDOW_DURATION_MS);
// remove the SqlIOConfigs of outputs whose system is "log" out of systemStreamConfigsBySource
@@ -168,9 +175,14 @@ public class SamzaSqlApplicationConfig {
public static SqlIOResolver createIOResolver(Config config) {
String sourceResolveValue = config.get(CFG_IO_RESOLVER);
+ Map<String, String> metadataPrefixProperties = new HashMap<>();
+ metadataPrefixProperties.put(
+ String.format(CFG_FMT_SOURCE_RESOLVER_DOMAIN, sourceResolveValue) + CFG_METADATA_TOPIC_PREFIX,
+ config.get(CFG_METADATA_TOPIC_PREFIX, DEFAULT_METADATA_TOPIC_PREFIX));
+ Config newConfig = new MapConfig(Arrays.asList(config, metadataPrefixProperties));
Validate.notEmpty(sourceResolveValue, "ioResolver config is not set or empty");
- return initializePlugin("SqlIOResolver", sourceResolveValue, config, CFG_FMT_SOURCE_RESOLVER_DOMAIN,
- (o, c) -> ((SqlIOResolverFactory) o).create(c, config));
+ return initializePlugin("SqlIOResolver", sourceResolveValue, newConfig, CFG_FMT_SOURCE_RESOLVER_DOMAIN,
+ (o, c) -> ((SqlIOResolverFactory) o).create(c, newConfig));
}
private UdfResolver createUdfResolver(Map<String, String> config) {
@@ -283,6 +295,10 @@ public class SamzaSqlApplicationConfig {
return ioResolver;
}
+ public String getMetadataTopicPrefix() {
+ return metadataTopicPrefix;
+ }
+
public long getWindowDurationMs() {
return windowDurationMs;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/58f18117/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
index ac2c64d..0761898 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
@@ -76,10 +76,12 @@ class JoinTranslator {
private static final Logger log = LoggerFactory.getLogger(JoinTranslator.class);
private int joinId;
private SqlIOResolver ioResolver;
+ private final String intermediateStreamPrefix;
- JoinTranslator(int joinId, SqlIOResolver ioResolver) {
+ JoinTranslator(int joinId, SqlIOResolver ioResolver, String intermediateStreamPrefix) {
this.joinId = joinId;
this.ioResolver = ioResolver;
+ this.intermediateStreamPrefix = intermediateStreamPrefix + (intermediateStreamPrefix.isEmpty() ? "" : "_");
}
void translate(final LogicalJoin join, final TranslatorContext context) {
@@ -124,7 +126,7 @@ class JoinTranslator {
.partitionBy(m -> createSamzaSqlCompositeKey(m, streamKeyIds),
m -> m,
KVSerde.of(keySerde, valueSerde),
- "stream_" + joinId)
+ intermediateStreamPrefix + "stream_" + joinId)
.map(KV::getValue)
.join(table, joinFn);
// MessageStream<SamzaSqlRelMessage> outputStream = inputStream.join(table, joinFn);
@@ -288,8 +290,18 @@ class JoinTranslator {
Table<KV<SamzaSqlCompositeKey, SamzaSqlRelMessage>> table =
context.getStreamAppDescriptor().getTable(sourceConfig.getTableDescriptor().get());
+ Serde<SamzaSqlCompositeKey> keySerde = new JsonSerdeV2<>(SamzaSqlCompositeKey.class);
+ SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde valueSerde =
+ (SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde) new SamzaSqlRelMessageSerdeFactory().getSerde(null, null);
+
+ // Let's always repartition by the join fields as key before sending the key and value to the table.
+ // We need to repartition the stream denoted as table to ensure that both the stream and table that are joined
+ // have the same partitioning scheme and partition key.
relOutputStream
- .map(m -> new KV(createSamzaSqlCompositeKey(m, tableKeyIds), m))
+ .partitionBy(m -> createSamzaSqlCompositeKey(m, tableKeyIds),
+ m -> m,
+ KVSerde.of(keySerde, valueSerde),
+ intermediateStreamPrefix + "table_" + joinId)
.sendTo(table);
return table;
http://git-wip-us.apache.org/repos/asf/samza/blob/58f18117/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
index 216ecea..40a08ff 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/LogicalAggregateTranslator.java
@@ -45,9 +45,11 @@ class LogicalAggregateTranslator {
private static final Logger log = LoggerFactory.getLogger(JoinTranslator.class);
private int windowId;
+ private String changeLogStorePrefix;
- LogicalAggregateTranslator(int windowId) {
+ LogicalAggregateTranslator(int windowId, String changeLogStorePrefix) {
this.windowId = windowId;
+ this.changeLogStorePrefix = changeLogStorePrefix + (changeLogStorePrefix.isEmpty() ? "" : "_");
}
void translate(final LogicalAggregate aggregate, final TranslatorContext context) {
@@ -69,7 +71,8 @@ class LogicalAggregateTranslator {
foldCountFn,
new SamzaSqlRelMessageSerdeFactory.SamzaSqlRelMessageSerde(),
new LongSerde())
- .setAccumulationMode(AccumulationMode.DISCARDING), "tumblingWindow_" + windowId)
+ .setAccumulationMode(
+ AccumulationMode.DISCARDING), changeLogStorePrefix + "_tumblingWindow_" + windowId)
.map(windowPane -> {
List<String> fieldNames = windowPane.getKey().getKey().getSamzaSqlRelRecord().getFieldNames();
List<Object> fieldValues = windowPane.getKey().getKey().getSamzaSqlRelRecord().getFieldValues();
http://git-wip-us.apache.org/repos/asf/samza/blob/58f18117/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index a1e7adb..4c5f11c 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -177,7 +177,8 @@ public class QueryTranslator {
public RelNode visit(LogicalJoin join) {
RelNode node = super.visit(join);
joinId++;
- new JoinTranslator(joinId, ioResolver).translate(join, translatorContext);
+ new JoinTranslator(joinId, ioResolver, sqlConfig.getMetadataTopicPrefix())
+ .translate(join, translatorContext);
return node;
}
@@ -185,7 +186,8 @@ public class QueryTranslator {
public RelNode visit(LogicalAggregate aggregate) {
RelNode node = super.visit(aggregate);
windowId++;
- new LogicalAggregateTranslator(windowId).translate(aggregate, translatorContext);
+ new LogicalAggregateTranslator(windowId, sqlConfig.getMetadataTopicPrefix())
+ .translate(aggregate, translatorContext);
return node;
}
});
http://git-wip-us.apache.org/repos/asf/samza/blob/58f18117/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
index 14314c8..818e33d 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java
@@ -32,10 +32,10 @@ import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.sql.data.SamzaSqlCompositeKey;
-import org.apache.samza.sql.data.SamzaSqlRelMessage;
import org.apache.samza.sql.interfaces.SqlIOConfig;
import org.apache.samza.sql.interfaces.SqlIOResolver;
import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
+import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
import org.apache.samza.table.ReadWriteTable;
import org.apache.samza.table.Table;
@@ -44,6 +44,9 @@ import org.apache.samza.table.descriptors.TableProviderFactory;
import org.apache.samza.table.TableSpec;
import org.apache.samza.table.utils.descriptors.BaseTableProvider;
+import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.CFG_METADATA_TOPIC_PREFIX;
+import static org.apache.samza.sql.runner.SamzaSqlApplicationConfig.DEFAULT_METADATA_TOPIC_PREFIX;
+
public class TestIOResolverFactory implements SqlIOResolverFactory {
public static final String TEST_DB_SYSTEM = "testDb";
@@ -179,9 +182,12 @@ public class TestIOResolverFactory implements SqlIOResolverFactory {
private final String SAMZA_SQL_QUERY_TABLE_KEYWORD = "$table";
private final Config config;
private final Map<String, TableDescriptor> tableDescMap = new HashMap<>();
+ private final String changeLogStorePrefix;
public TestIOResolver(Config config) {
this.config = config;
+ String metadataTopicPrefix = config.get(CFG_METADATA_TOPIC_PREFIX, DEFAULT_METADATA_TOPIC_PREFIX);
+ this.changeLogStorePrefix = metadataTopicPrefix + (metadataTopicPrefix.isEmpty() ? "" : "_");
}
private SqlIOConfig fetchIOInfo(String ioName, boolean isSink) {
@@ -200,10 +206,10 @@ public class TestIOResolverFactory implements SqlIOResolverFactory {
if (isSink) {
tableDescriptor = new TestTableDescriptor(TEST_TABLE_ID + tableDescMap.size());
} else {
- String tableId = "InputTable-" + ioName.replace(".", "-").replace("$", "-");
+ String tableId = changeLogStorePrefix + "InputTable-" + ioName.replace(".", "-").replace("$", "-");
tableDescriptor = new RocksDbTableDescriptor(tableId, KVSerde.of(
new JsonSerdeV2<>(SamzaSqlCompositeKey.class),
- new JsonSerdeV2<>(SamzaSqlRelMessage.class)));
+ new SamzaSqlRelMessageSerdeFactory().getSerde(null, null))).withChangelogEnabled();
}
tableDescMap.put(ioName, tableDescriptor);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/58f18117/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
index dcd7023..33c6d02 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestJoinTranslator.java
@@ -164,7 +164,7 @@ public class TestJoinTranslator extends TranslatorTestBase {
when(mockIOConfig.getTableDescriptor()).thenReturn(Optional.of(mockTableDesc));
// Apply translate() method to verify that we are getting the correct map operator constructed
- JoinTranslator joinTranslator = new JoinTranslator(3, mockResolver);
+ JoinTranslator joinTranslator = new JoinTranslator(3, mockResolver, "");
joinTranslator.translate(mockJoin, mockContext);
// make sure that context has been registered with LogicFilter and output message streams
verify(mockContext, times(1)).registerMessageStream(3, this.getRegisteredMessageStream(3));
http://git-wip-us.apache.org/repos/asf/samza/blob/58f18117/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
index d0dc23f..250253e 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/translator/TestQueryTranslator.java
@@ -495,6 +495,7 @@ public class TestQueryTranslator {
+ " join testavro.PROFILE.`$table` as p"
+ " on p.id = pv.profileId";
config.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql);
+ config.put(SamzaSqlApplicationConfig.CFG_METADATA_TOPIC_PREFIX, "sampleAppv1");
Config samzaConfig = SamzaSqlApplicationRunner.computeSamzaConfigs(true, new MapConfig(config));
List<String> sqlStmts = fetchSqlFromConfig(config);
@@ -520,26 +521,36 @@ public class TestQueryTranslator {
String input3StreamId = specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get();
String input3System = streamConfig.getSystem(input3StreamId);
String input3PhysicalName = streamConfig.getPhysicalName(input3StreamId);
+ String input4StreamId = specGraph.getInputOperators().keySet().stream().skip(3).findFirst().get();
+ String input4System = streamConfig.getSystem(input4StreamId);
+ String input4PhysicalName = streamConfig.getPhysicalName(input4StreamId);
String output1StreamId = specGraph.getOutputStreams().keySet().stream().findFirst().get();
String output1System = streamConfig.getSystem(output1StreamId);
String output1PhysicalName = streamConfig.getPhysicalName(output1StreamId);
String output2StreamId = specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get();
String output2System = streamConfig.getSystem(output2StreamId);
String output2PhysicalName = streamConfig.getPhysicalName(output2StreamId);
+ String output3StreamId = specGraph.getOutputStreams().keySet().stream().skip(2).findFirst().get();
+ String output3System = streamConfig.getSystem(output3StreamId);
+ String output3PhysicalName = streamConfig.getPhysicalName(output3StreamId);
- Assert.assertEquals(2, specGraph.getOutputStreams().size());
+ Assert.assertEquals(3, specGraph.getOutputStreams().size());
Assert.assertEquals("kafka", output1System);
- Assert.assertEquals("sql-job-1-partition_by-stream_1", output1PhysicalName);
- Assert.assertEquals("testavro", output2System);
- Assert.assertEquals("enrichedPageViewTopic", output2PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_1", output1PhysicalName);
+ Assert.assertEquals("kafka", output2System);
+ Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_1", output2PhysicalName);
+ Assert.assertEquals("testavro", output3System);
+ Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName);
- Assert.assertEquals(3, specGraph.getInputOperators().size());
+ Assert.assertEquals(4, specGraph.getInputOperators().size());
Assert.assertEquals("testavro", input1System);
Assert.assertEquals("PAGEVIEW", input1PhysicalName);
Assert.assertEquals("testavro", input2System);
Assert.assertEquals("PROFILE", input2PhysicalName);
Assert.assertEquals("kafka", input3System);
- Assert.assertEquals("sql-job-1-partition_by-stream_1", input3PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_table_1", input3PhysicalName);
+ Assert.assertEquals("kafka", input4System);
+ Assert.assertEquals("sql-job-1-partition_by-sampleAppv1_stream_1", input4PhysicalName);
}
@Test
@@ -578,26 +589,36 @@ public class TestQueryTranslator {
String input3StreamId = specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get();
String input3System = streamConfig.getSystem(input3StreamId);
String input3PhysicalName = streamConfig.getPhysicalName(input3StreamId);
+ String input4StreamId = specGraph.getInputOperators().keySet().stream().skip(3).findFirst().get();
+ String input4System = streamConfig.getSystem(input4StreamId);
+ String input4PhysicalName = streamConfig.getPhysicalName(input4StreamId);
String output1StreamId = specGraph.getOutputStreams().keySet().stream().findFirst().get();
String output1System = streamConfig.getSystem(output1StreamId);
String output1PhysicalName = streamConfig.getPhysicalName(output1StreamId);
String output2StreamId = specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get();
String output2System = streamConfig.getSystem(output2StreamId);
String output2PhysicalName = streamConfig.getPhysicalName(output2StreamId);
+ String output3StreamId = specGraph.getOutputStreams().keySet().stream().skip(2).findFirst().get();
+ String output3System = streamConfig.getSystem(output3StreamId);
+ String output3PhysicalName = streamConfig.getPhysicalName(output3StreamId);
- Assert.assertEquals(2, specGraph.getOutputStreams().size());
+ Assert.assertEquals(3, specGraph.getOutputStreams().size());
Assert.assertEquals("kafka", output1System);
- Assert.assertEquals("sql-job-1-partition_by-stream_1", output1PhysicalName);
- Assert.assertEquals("testavro", output2System);
- Assert.assertEquals("enrichedPageViewTopic", output2PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-table_1", output1PhysicalName);
+ Assert.assertEquals("kafka", output2System);
+ Assert.assertEquals("sql-job-1-partition_by-stream_1", output2PhysicalName);
+ Assert.assertEquals("testavro", output3System);
+ Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName);
- Assert.assertEquals(3, specGraph.getInputOperators().size());
+ Assert.assertEquals(4, specGraph.getInputOperators().size());
Assert.assertEquals("testavro", input1System);
Assert.assertEquals("PAGEVIEW", input1PhysicalName);
Assert.assertEquals("testavro", input2System);
Assert.assertEquals("PROFILE", input2PhysicalName);
Assert.assertEquals("kafka", input3System);
- Assert.assertEquals("sql-job-1-partition_by-stream_1", input3PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-table_1", input3PhysicalName);
+ Assert.assertEquals("kafka", input4System);
+ Assert.assertEquals("sql-job-1-partition_by-stream_1", input4PhysicalName);
}
@Test
@@ -635,26 +656,36 @@ public class TestQueryTranslator {
String input3StreamId = specGraph.getInputOperators().keySet().stream().skip(2).findFirst().get();
String input3System = streamConfig.getSystem(input3StreamId);
String input3PhysicalName = streamConfig.getPhysicalName(input3StreamId);
+ String input4StreamId = specGraph.getInputOperators().keySet().stream().skip(3).findFirst().get();
+ String input4System = streamConfig.getSystem(input4StreamId);
+ String input4PhysicalName = streamConfig.getPhysicalName(input4StreamId);
String output1StreamId = specGraph.getOutputStreams().keySet().stream().findFirst().get();
String output1System = streamConfig.getSystem(output1StreamId);
String output1PhysicalName = streamConfig.getPhysicalName(output1StreamId);
String output2StreamId = specGraph.getOutputStreams().keySet().stream().skip(1).findFirst().get();
String output2System = streamConfig.getSystem(output2StreamId);
String output2PhysicalName = streamConfig.getPhysicalName(output2StreamId);
+ String output3StreamId = specGraph.getOutputStreams().keySet().stream().skip(2).findFirst().get();
+ String output3System = streamConfig.getSystem(output3StreamId);
+ String output3PhysicalName = streamConfig.getPhysicalName(output3StreamId);
- Assert.assertEquals(2, specGraph.getOutputStreams().size());
+ Assert.assertEquals(3, specGraph.getOutputStreams().size());
Assert.assertEquals("kafka", output1System);
- Assert.assertEquals("sql-job-1-partition_by-stream_1", output1PhysicalName);
- Assert.assertEquals("testavro", output2System);
- Assert.assertEquals("enrichedPageViewTopic", output2PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-table_1", output1PhysicalName);
+ Assert.assertEquals("kafka", output2System);
+ Assert.assertEquals("sql-job-1-partition_by-stream_1", output2PhysicalName);
+ Assert.assertEquals("testavro", output3System);
+ Assert.assertEquals("enrichedPageViewTopic", output3PhysicalName);
- Assert.assertEquals(3, specGraph.getInputOperators().size());
+ Assert.assertEquals(4, specGraph.getInputOperators().size());
Assert.assertEquals("testavro", input1System);
Assert.assertEquals("PROFILE", input1PhysicalName);
Assert.assertEquals("testavro", input2System);
Assert.assertEquals("PAGEVIEW", input2PhysicalName);
Assert.assertEquals("kafka", input3System);
- Assert.assertEquals("sql-job-1-partition_by-stream_1", input3PhysicalName);
+ Assert.assertEquals("sql-job-1-partition_by-table_1", input3PhysicalName);
+ Assert.assertEquals("kafka", input4System);
+ Assert.assertEquals("sql-job-1-partition_by-stream_1", input4PhysicalName);
}
@Test