You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sr...@apache.org on 2018/09/04 23:27:38 UTC

samza git commit: SAMZA-1821: fix the IllegalStateException complaining duplicate key when fetching systemStream configs

Repository: samza
Updated Branches:
  refs/heads/master 28ca72965 -> b0b292200


SAMZA-1821: fix the IllegalStateException complaining duplicate key when fetching systemStream configs

## What changes were proposed in this pull request?
This PR is to fix IllegalStateException complaining duplicate key when fetching resource configs in SamzaSqlApplicationConfig.

## How was this patch tested?
Pass the local build and current unit tests.
Added a new unit test.

Author: Weiqing Yang <wi...@wiyang-mn1.linkedin.biz>

Reviewers: Aditya Toomula <at...@linkedin.com>, Srinivasulu Punuru <sp...@linkedin.com>

Closes #616 from weiqingy/SAMZA-1821 and squashes the following commits:

e85df23f [Weiqing Yang] use distinct()
61245627 [Weiqing Yang] SAMZA-1821: fix the IllegalStateException complaining duplicate key when fetching systemStream configs


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b0b29220
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b0b29220
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b0b29220

Branch: refs/heads/master
Commit: b0b2922002c3e983ca18243bd3d42b3be4e17df8
Parents: 28ca729
Author: Weiqing Yang <wi...@wiyang-mn1.linkedin.biz>
Authored: Tue Sep 4 16:27:30 2018 -0700
Committer: Srinivasulu Punuru <sp...@linkedin.com>
Committed: Tue Sep 4 16:27:30 2018 -0700

----------------------------------------------------------------------
 .../sql/runner/SamzaSqlApplicationConfig.java   |  6 ++++--
 .../runner/TestSamzaSqlApplicationConfig.java   | 22 +++++++++++++++++++-
 .../sql/testutil/TestSamzaSqlFileParser.java    |  1 -
 3 files changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/b0b29220/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
index 316d174..997312f 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationConfig.java
@@ -113,13 +113,15 @@ public class SamzaSqlApplicationConfig {
     inputSystemStreamConfigBySource = queryInfo.stream()
         .map(QueryInfo::getSources)
         .flatMap(Collection::stream)
-        .collect(Collectors.toMap(Function.identity(), src -> ioResolver.fetchSourceInfo(src)));
+        .distinct()
+        .collect(Collectors.toMap(Function.identity(), ioResolver::fetchSourceInfo));
 
     Set<SqlIOConfig> systemStreamConfigs = new HashSet<>(inputSystemStreamConfigBySource.values());
 
     outputSystemStreamConfigsBySource = queryInfo.stream()
         .map(QueryInfo::getSink)
-        .collect(Collectors.toMap(Function.identity(), x -> ioResolver.fetchSinkInfo(x)));
+        .distinct()
+        .collect(Collectors.toMap(Function.identity(), ioResolver::fetchSinkInfo));
     systemStreamConfigs.addAll(outputSystemStreamConfigsBySource.values());
 
     relSchemaProvidersBySource = systemStreamConfigs.stream()

http://git-wip-us.apache.org/repos/asf/samza/blob/b0b29220/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
index dac5d02..dda0e14 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationConfig.java
@@ -19,13 +19,16 @@
 
 package org.apache.samza.sql.runner;
 
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.sql.impl.ConfigBasedUdfResolver;
 import org.apache.samza.sql.interfaces.SqlIOConfig;
-import org.apache.samza.sql.runner.SamzaSqlApplicationConfig;
+import org.apache.samza.sql.testutil.JsonUtil;
 import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
 import org.junit.Assert;
 import org.junit.Test;
@@ -76,6 +79,23 @@ public class TestSamzaSqlApplicationConfig {
     testWithoutConfigShouldPass(config, logSamzaSqlConfigPrefix + SqlIOConfig.CFG_SAMZA_REL_CONVERTER);
   }
 
+  @Test
+  public void testGetInputAndOutputStreamConfigs() {
+    List<String> sqlStmts = Arrays.asList("Insert into testavro.COMPLEX1 select * from testavro.SIMPLE1",
+        "insert into testavro.Profile select * from testavro.SIMPLE1");
+    Map<String, String> config = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
+    config.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    SamzaSqlApplicationConfig samzaSqlApplicationConfig = new SamzaSqlApplicationConfig(new MapConfig(config));
+
+    Set<String> inputKeys = samzaSqlApplicationConfig.getInputSystemStreamConfigBySource().keySet();
+    Set<String> outputKeys = samzaSqlApplicationConfig.getOutputSystemStreamConfigsBySource().keySet();
+    Assert.assertEquals(1, inputKeys.size());
+    Assert.assertTrue(inputKeys.contains("testavro.SIMPLE1"));
+    Assert.assertEquals(2, outputKeys.size());
+    Assert.assertTrue(outputKeys.contains("testavro.COMPLEX1"));
+    Assert.assertTrue(outputKeys.contains("testavro.Profile"));
+  }
+
   private void testWithoutConfigShouldPass(Map<String, String> config, String configKey) {
     Map<String, String> badConfigs = new HashMap<>(config);
     badConfigs.remove(configKey);

http://git-wip-us.apache.org/repos/asf/samza/blob/b0b29220/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
index a84f347..1723e0e 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/TestSamzaSqlFileParser.java
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.List;
 
-import org.apache.samza.sql.testutil.SqlFileParser;
 import org.junit.Assert;
 import org.junit.Test;