You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2018/11/12 19:42:57 UTC
samza git commit: SAMZA-1986: Samza-sql: Use system name along with
stream name for streamId
Repository: samza
Updated Branches:
refs/heads/master 6dd3e1823 -> 1d6f693c8
SAMZA-1986: Samza-sql: Use system name along with stream name for streamId
Author: Aditya Toomula <at...@linkedin.com>
Reviewers: Srinivasulu Punuru <sp...@linkedin.com>
Closes #800 from atoomula/streamid
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1d6f693c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1d6f693c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1d6f693c
Branch: refs/heads/master
Commit: 1d6f693c84c7476d0d51a3a47e41908f7ddd1646
Parents: 6dd3e18
Author: Aditya Toomula <at...@linkedin.com>
Authored: Mon Nov 12 11:42:51 2018 -0800
Committer: Srinivasulu Punuru <sp...@linkedin.com>
Committed: Mon Nov 12 11:42:51 2018 -0800
----------------------------------------------------------------------
.../samza/sql/interfaces/SqlIOConfig.java | 29 +++++++------
.../sql/runner/SamzaSqlApplicationRunner.java | 4 +-
.../samza/sql/translator/ModifyTranslator.java | 4 +-
.../samza/sql/translator/QueryTranslator.java | 2 +-
.../samza/sql/translator/ScanTranslator.java | 4 +-
.../runner/TestSamzaSqlApplicationRunner.java | 8 ++--
.../samza/sql/system/TestAvroSystemFactory.java | 34 ++++++++++-----
.../samza/sql/testutil/SamzaSqlTestConfig.java | 23 ++++++++++
.../test/samzasql/TestSamzaSqlEndToEnd.java | 45 ++++++++++++++++++++
9 files changed, 119 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/1d6f693c/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
index 3ef1795..5fa30e7 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
@@ -43,9 +43,7 @@ public class SqlIOConfig {
public static final String CFG_SAMZA_REL_TABLE_KEY_CONVERTER = "samzaRelTableKeyConverterName";
public static final String CFG_REL_SCHEMA_PROVIDER = "relSchemaProviderName";
- private final String systemName;
-
- private final String streamName;
+ private final String streamId;
private final String samzaRelConverterName;
private final String samzaRelTableKeyConverterName;
@@ -71,13 +69,16 @@ public class SqlIOConfig {
public SqlIOConfig(String systemName, String streamName, List<String> sourceParts,
Config systemConfig, TableDescriptor tableDescriptor) {
HashMap<String, String> streamConfigs = new HashMap<>(systemConfig);
- this.systemName = systemName;
- this.streamName = streamName;
this.source = getSourceFromSourceParts(sourceParts);
this.sourceParts = sourceParts;
this.systemStream = new SystemStream(systemName, streamName);
this.tableDescriptor = Optional.ofNullable(tableDescriptor);
+ // Remote table has no backing stream associated with it and hence streamId does not make sense. But let's keep it
+ // for uniformity. Remote table has table descriptor defined.
+ // Local table has both backing stream and a tableDescriptor defined.
+ this.streamId = String.format("%s-%s", systemName, streamName);
+
samzaRelConverterName = streamConfigs.get(CFG_SAMZA_REL_CONVERTER);
Validate.notEmpty(samzaRelConverterName,
String.format("%s is not set or empty for system %s", CFG_SAMZA_REL_CONVERTER, systemName));
@@ -96,10 +97,14 @@ public class SqlIOConfig {
streamConfigs.remove(CFG_SAMZA_REL_CONVERTER);
streamConfigs.remove(CFG_REL_SCHEMA_PROVIDER);
- // Currently, only local table is supported. And it is assumed that all tables are local tables.
- if (tableDescriptor != null) {
- streamConfigs.put(String.format(StreamConfig.BOOTSTRAP_FOR_STREAM_ID(), streamName), "true");
- streamConfigs.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(), streamName), "oldest");
+ if (!isRemoteTable()) {
+ // The below config is required for local table and streams but not for remote table.
+ streamConfigs.put(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), streamId), streamName);
+ if (tableDescriptor != null) {
+ // For local table, set the bootstrap config and default offset to oldest
+ streamConfigs.put(String.format(StreamConfig.BOOTSTRAP_FOR_STREAM_ID(), streamId), "true");
+ streamConfigs.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(), streamId), "oldest");
+ }
}
config = new MapConfig(streamConfigs);
@@ -114,11 +119,11 @@ public class SqlIOConfig {
}
public String getSystemName() {
- return systemName;
+ return systemStream.getSystem();
}
- public String getStreamName() {
- return streamName;
+ public String getStreamId() {
+ return streamId;
}
public String getSamzaRelConverterName() {
http://git-wip-us.apache.org/repos/asf/samza/blob/1d6f693c/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
index 9d361fb..41e37f1 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
@@ -94,14 +94,14 @@ public class SamzaSqlApplicationRunner implements ApplicationRunner {
// Populate stream to system mapping config for input and output system streams
for (String source : inputSystemStreams) {
SqlIOConfig inputSystemStreamConfig = ioResolver.fetchSourceInfo(source);
- newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, inputSystemStreamConfig.getStreamName()),
+ newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, inputSystemStreamConfig.getStreamId()),
inputSystemStreamConfig.getSystemName());
newConfig.putAll(inputSystemStreamConfig.getConfig());
}
for (String sink : outputSystemStreams) {
SqlIOConfig outputSystemStreamConfig = ioResolver.fetchSinkInfo(sink);
- newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, outputSystemStreamConfig.getStreamName()),
+ newConfig.put(String.format(CFG_FMT_SAMZA_STREAM_SYSTEM, outputSystemStreamConfig.getStreamId()),
outputSystemStreamConfig.getSystemName());
newConfig.putAll(outputSystemStreamConfig.getConfig());
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1d6f693c/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
index b26f4a7..dbeabab 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ModifyTranslator.java
@@ -99,13 +99,13 @@ class ModifyTranslator {
SqlIOConfig sinkConfig = systemStreamConfig.get(targetName);
final String systemName = sinkConfig.getSystemName();
- final String streamName = sinkConfig.getStreamName();
+ final String streamId = sinkConfig.getStreamId();
final String source = sinkConfig.getSource();
KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
DelegatingSystemDescriptor
sd = systemDescriptors.computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
- GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(streamName, noOpKVSerde);
+ GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(streamId, noOpKVSerde);
MessageStreamImpl<SamzaSqlRelMessage> stream =
(MessageStreamImpl<SamzaSqlRelMessage>) context.getMessageStream(tableModify.getInput().getId());
http://git-wip-us.apache.org/repos/asf/samza/blob/1d6f693c/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
index 7f3c11e..a826f9f 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/QueryTranslator.java
@@ -209,7 +209,7 @@ public class QueryTranslator {
String systemName = sinkConfig.getSystemName();
DelegatingSystemDescriptor
sd = systemDescriptors.computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
- GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(sinkConfig.getStreamName(), noOpKVSerde);
+ GenericOutputDescriptor<KV<Object, Object>> osd = sd.getOutputDescriptor(sinkConfig.getStreamId(), noOpKVSerde);
if (OutputMapFunction.logOutputStream == null) {
OutputMapFunction.logOutputStream = appDesc.getOutputStream(osd);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1d6f693c/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
index 2615aad..2a5a0e8 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java
@@ -90,7 +90,7 @@ class ScanTranslator {
Validate.isTrue(relMsgConverters.containsKey(sourceName), String.format("Unknown source %s", sourceName));
SqlIOConfig sqlIOConfig = systemStreamConfig.get(sourceName);
final String systemName = sqlIOConfig.getSystemName();
- final String streamName = sqlIOConfig.getStreamName();
+ final String streamId = sqlIOConfig.getStreamId();
final String source = sqlIOConfig.getSource();
final boolean isRemoteTable = sqlIOConfig.getTableDescriptor().isPresent() &&
@@ -107,7 +107,7 @@ class ScanTranslator {
KVSerde<Object, Object> noOpKVSerde = KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
DelegatingSystemDescriptor
sd = systemDescriptors.computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
- GenericInputDescriptor<KV<Object, Object>> isd = sd.getInputDescriptor(streamName, noOpKVSerde);
+ GenericInputDescriptor<KV<Object, Object>> isd = sd.getInputDescriptor(streamId, noOpKVSerde);
MessageStream<KV<Object, Object>> inputStream = inputMsgStreams.computeIfAbsent(source, v -> streamAppDesc.getInputStream(isd));
MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream = inputStream.map(new ScanMapFunction(sourceName, queryId));
http://git-wip-us.apache.org/repos/asf/samza/blob/1d6f693c/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
index f0df3a9..ccab449 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
@@ -40,13 +40,13 @@ public class TestSamzaSqlApplicationRunner {
MapConfig samzaConfig = new MapConfig(configs);
Config newConfigs = SamzaSqlApplicationRunner.computeSamzaConfigs(true, samzaConfig);
Assert.assertEquals(newConfigs.get(SamzaSqlApplicationRunner.RUNNER_CONFIG), LocalApplicationRunner.class.getName());
- // Check whether three new configs added.
- Assert.assertEquals(newConfigs.size(), configs.size() + 3);
+ // Check whether five new configs added.
+ Assert.assertEquals(newConfigs.size(), configs.size() + 5);
newConfigs = SamzaSqlApplicationRunner.computeSamzaConfigs(false, samzaConfig);
Assert.assertEquals(newConfigs.get(SamzaSqlApplicationRunner.RUNNER_CONFIG), RemoteApplicationRunner.class.getName());
- // Check whether three new configs added.
- Assert.assertEquals(newConfigs.size(), configs.size() + 3);
+ // Check whether five new configs added.
+ Assert.assertEquals(newConfigs.size(), configs.size() + 5);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1d6f693c/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
index 9a9e269..f7d13a4 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
@@ -49,6 +49,7 @@ import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -164,17 +165,28 @@ public class TestAvroSystemFactory implements SystemFactory {
@Override
public void register(SystemStreamPartition systemStreamPartition, String offset) {
- if (systemStreamPartition.getStream().toLowerCase().contains("simple1")) {
- simpleRecordSsps.add(systemStreamPartition);
- }
- if (systemStreamPartition.getStream().toLowerCase().contains("profile")) {
- profileRecordSsps.add(systemStreamPartition);
- }
- if (systemStreamPartition.getStream().toLowerCase().contains("company")) {
- companyRecordSsps.add(systemStreamPartition);
- }
- if (systemStreamPartition.getStream().toLowerCase().contains("pageview")) {
- pageViewRecordSsps.add(systemStreamPartition);
+ switch (systemStreamPartition.getStream().toLowerCase()) {
+ case "simple1":
+ simpleRecordSsps.add(systemStreamPartition);
+ break;
+ case "profile":
+ profileRecordSsps.add(systemStreamPartition);
+ break;
+ case "company":
+ companyRecordSsps.add(systemStreamPartition);
+ break;
+ case "pageview":
+ pageViewRecordSsps.add(systemStreamPartition);
+ break;
+ case "complex1":
+ break;
+ case "simple2":
+ break;
+ case "simple3":
+ break;
+ default:
+ Assert.assertTrue(String.format("ssp %s is not recognized", systemStreamPartition), false);
+ break;
}
curMessagesPerSsp.put(systemStreamPartition, 0);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1d6f693c/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
index 80b47eb..f9f4124 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
@@ -53,6 +53,8 @@ import static org.apache.samza.sql.testutil.RemoteStoreIOResolverTestFactory.TES
public class SamzaSqlTestConfig {
public static final String SAMZA_SYSTEM_TEST_AVRO = "testavro";
+ public static final String SAMZA_SYSTEM_TEST_AVRO2 = "testavro2";
+ public static final String SAMZA_SYSTEM_TEST_DB = "testDb";
public static Map<String, String> fetchStaticConfigsWithFactories(int numberOfMessages) {
return fetchStaticConfigsWithFactories(new HashMap<>(), numberOfMessages, false);
@@ -109,6 +111,24 @@ public class SamzaSqlTestConfig {
staticConfigs.put(testRemoteStoreSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_TABLE_KEY_CONVERTER, "sample");
staticConfigs.put(testRemoteStoreSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+ String avro2SystemConfigPrefix =
+ String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_TEST_AVRO2);
+ String avro2SamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_TEST_AVRO2);
+ staticConfigs.put(avro2SystemConfigPrefix + "samza.factory", TestAvroSystemFactory.class.getName());
+ staticConfigs.put(avro2SystemConfigPrefix + TestAvroSystemFactory.CFG_NUM_MESSAGES,
+ String.valueOf(numberOfMessages));
+ staticConfigs.put(avro2SystemConfigPrefix + TestAvroSystemFactory.CFG_INCLUDE_NULL_FOREIGN_KEYS,
+ includeNullForeignKeys ? "true" : "false");
+ staticConfigs.put(avro2SystemConfigPrefix + TestAvroSystemFactory.CFG_SLEEP_BETWEEN_POLLS_MS,
+ String.valueOf(windowDurationMs / 2));
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_GROUPBY_WINDOW_DURATION_MS, String.valueOf(windowDurationMs));
+ staticConfigs.put(avro2SamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
+ staticConfigs.put(avro2SamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+
+ String testDbSamzaSqlConfigPrefix = configIOResolverDomain + String.format("%s.", SAMZA_SYSTEM_TEST_DB);
+ staticConfigs.put(testDbSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER, "avro");
+ staticConfigs.put(testDbSamzaSqlConfigPrefix + SqlIOConfig.CFG_REL_SCHEMA_PROVIDER, "config");
+
String avroSamzaToRelMsgConverterDomain =
String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "avro");
staticConfigs.put(avroSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
@@ -133,6 +153,9 @@ public class SamzaSqlTestConfig {
"testavro", "SIMPLE1"), SimpleRecord.SCHEMA$.toString());
staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
+ "testavro2", "SIMPLE1"), SimpleRecord.SCHEMA$.toString());
+
+ staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
"testavro", "SIMPLE2"), SimpleRecord.SCHEMA$.toString());
staticConfigs.put(configAvroRelSchemaProviderDomain + String.format(ConfigBasedAvroRelSchemaProviderFactory.CFG_SOURCE_SCHEMA,
http://git-wip-us.apache.org/repos/asf/samza/blob/1d6f693c/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 77538ff..3f2148c 100644
--- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -90,6 +90,26 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
}
@Test
+ public void testEndToEndWithDifferentSystemSameStream() {
+ int numMessages = 20;
+
+ TestAvroSystemFactory.messages.clear();
+ Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ String sql = "Insert into testavro2.SIMPLE1 select * from testavro.SIMPLE1";
+ List<String> sqlStmts = Arrays.asList(sql);
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+ SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+ runner.runAndWaitForFinish();
+
+ List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
+ .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString()))
+ .sorted()
+ .collect(Collectors.toList());
+ Assert.assertEquals(numMessages, outMessages.size());
+ Assert.assertTrue(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()).equals(outMessages));
+ }
+
+ @Test
public void testEndToEndMultiSqlStmts() {
int numMessages = 20;
TestAvroSystemFactory.messages.clear();
@@ -110,6 +130,31 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness {
Assert.assertTrue(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()).equals(new ArrayList<>(outMessagesSet)));
}
+ // The below test won't work until SAMZA-1990 is fixed. Currently, Samza framework does not allow same system stream
+ // to be used as both input and output stream.
+ @Ignore
+ @Test
+ public void testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput() {
+ int numMessages = 20;
+ TestAvroSystemFactory.messages.clear();
+ Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
+ String sql1 = "Insert into testavro.SIMPLE1 select * from testavro.SIMPLE2";
+ String sql2 = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1";
+ List<String> sqlStmts = Arrays.asList(sql1, sql2);
+
+ staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+ SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
+ runner.runAndWaitForFinish();
+ List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
+ .map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString()))
+ .sorted()
+ .collect(Collectors.toList());
+ Assert.assertEquals(numMessages * 2, outMessages.size());
+ Set<Integer> outMessagesSet = new HashSet<>(outMessages);
+ Assert.assertEquals(numMessages, outMessagesSet.size());
+ Assert.assertTrue(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()).equals(new ArrayList<>(outMessagesSet)));
+ }
+
@Test
public void testEndToEndFanIn() {
int numMessages = 20;