You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2018/09/04 23:27:38 UTC
samza git commit: SAMZA-1821: fix the IllegalStateException
complaining duplicate key when fetching systemStream configs
Repository: samza
Updated Branches:
refs/heads/master 28ca72965 -> b0b292200
SAMZA-1821: fix the IllegalStateException complaining duplicate key when fetching systemStream configs
## What changes were proposed in this pull request?
This PR is to fix IllegalStateException complaining duplicate key when fetching resource configs in SamzaSqlApplicationConfig.
## How was this patch tested?
Pass the local build and current unit tests.
Added a new unit test.
Author: Weiqing Yang <wi...@wiyang-mn1.linkedin.biz>
Reviewers: Aditya Toomula <at...@linkedin.com>, Srinivasulu Punuru <sp...@linkedin.com>
Closes #616 from weiqingy/SAMZA-1821 and squashes the following commits:
e85df23f [Weiqing Yang] use distinct()
61245627 [Weiqing Yang] SAMZA-1821: fix the IllegalStateException complaining duplicate key when fetching systemStream configs
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b0b29220
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b0b29220
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b0b29220
Branch: refs/heads/master
Commit: b0b2922002c3e983ca18243bd3d42b3be4e17df8
Parents: 28ca729
Author: Weiqing Yang <wi...@wiyang-mn1.linkedin.biz>
Authored: Tue Sep 4 16:27:30 2018 -0700
Committer: Srinivasulu Punuru <sp...@linkedin.com>
Committed: Tue Sep 4 16:27:30 2018 -0700
----------------------------------------------------------------------
.../sql/runner/SamzaSqlApplicationConfig.java | 6 ++++--
.../runner/TestSamzaSqlApplicationConfig.java | 22 +++++++++++++++++++-
.../sql/testutil/TestSamzaSqlFileParser.java | 1 -
3 files changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/b0b29220/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index 316d174..997312f 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -113,13 +113,15 @@ public class SamzaSqlApplicationConfig {
inputSystemStreamConfigBySource = queryInfo.stream()
.map(QueryInfo::getSources)
.flatMap(Collection::stream)
- .collect(Collectors.toMap(Function.identity(), src -> ioResolver.fetchSourceInfo(src)));
+ .distinct()
+ .collect(Collectors.toMap(Function.identity(), ioResolver::fetchSourceInfo));
Set<SqlIOConfig> systemStreamConfigs = new HashSet<>(inputSystemStreamConfigBySource.values());
outputSystemStreamConfigsBySource = queryInfo.stream()
.map(QueryInfo::getSink)
- .collect(Collectors.toMap(Function.identity(), x -> ioResolver.fetchSinkInfo(x)));
+ .distinct()
+ .collect(Collectors.toMap(Function.identity(), ioResolver::fetchSinkInfo));
systemStreamConfigs.addAll(outputSystemStreamConfigsBySource.values());
relSchemaProvidersBySource = systemStreamConfigs.stream()
http://git-wip-us.apache.org/repos/asf/samza/blob/b0b29220/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
index dac5d02..dda0e14 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
@@ -19,13 +19,16 @@
package org.apache.samza.sql.runner;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.samza.SamzaException;
import org.apache.samza.config.MapConfig;
import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
import org.apache.samza.sql.interfaces.SqlIOConfig;
-import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.testutil.JsonUtil;
import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
import org.junit.Assert;
import org.junit.Test;
@@ -76,6 +79,23 @@ public class TestSamzaSqlApplicationConfig {
testWithoutConfigShouldPass(config, logSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER);
}
+ @Test
+ public void testGetInputAndOutputStreamConfigs() {
+ List<String> sqlStmts = Arrays.asList("Insert into testavro.COMPLEX1 select * from testavro.SIMPLE1",
+ "insert into testavro.Profile select * from testavro.SIMPLE1");
+ Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
+ config.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+ SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+
+ Set<String> inputKeys = samzaSqlApplicationConfig.getInputSystemStreamConfigBySource().keySet();
+ Set<String> outputKeys = samzaSqlApplicationConfig.getOutputSystemStreamConfigsBySource().keySet();
+ Assert.assertEquals(1, inputKeys.size());
+ Assert.assertTrue(inputKeys.contains("testavro.SIMPLE1"));
+ Assert.assertEquals(2, outputKeys.size());
+ Assert.assertTrue(outputKeys.contains("testavro.COMPLEX1"));
+ Assert.assertTrue(outputKeys.contains("testavro.Profile"));
+ }
+
private void testWithoutConfigShouldPass(Map<String, String> config, String configKey) {
Map<String, String> badConfigs = new HashMap<>(config);
badConfigs.remove(configKey);
http://git-wip-us.apache.org/repos/asf/samza/blob/b0b29220/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
index a84f347..1723e0e 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.util.List;
-import org.apache.samza.sql.testutil.SqlFileParser;
import org.junit.Assert;
import org.junit.Test;