You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/12/05 22:14:00 UTC

samza git commit: SAMZA-2019: for 1 partition broadcast topic generate topic#0 config

Repository: samza
Updated Branches:
  refs/heads/master b668b5bea -> 1a7e27097


SAMZA-2019: for 1 partition broadcast topic generate topic#0 config

+ address few review comments

Author: Boris S <bs...@linkedin.com>
Author: Boris S <bo...@apache.org>
Author: Boris Shkolnik <bs...@linkedin.com>

Reviewers: xiliu <xi...@linkedin.com>

Closes #846 from sborya/isBroadcast1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1a7e2709
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1a7e2709
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1a7e2709

Branch: refs/heads/master
Commit: 1a7e27097c7509863437e96b1c392e3886ca67ab
Parents: b668b5b
Author: Boris S <bs...@linkedin.com>
Authored: Wed Dec 5 14:13:50 2018 -0800
Committer: Boris S <bs...@linkedin.com>
Committed: Wed Dec 5 14:13:50 2018 -0800

----------------------------------------------------------------------
 .../JobNodeConfigurationGenerator.java          |  6 +++++-
 .../org/apache/samza/execution/StreamEdge.java  | 20 ++++++++++----------
 2 files changed, 15 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/1a7e2709/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
index a762dec..761fb05 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
@@ -105,7 +105,11 @@ import org.slf4j.LoggerFactory;
     for (StreamEdge inEdge : inEdges.values()) {
       String formattedSystemStream = inEdge.getName();
       if (inEdge.isBroadcast()) {
-        broadcastInputs.add(formattedSystemStream + "#[0-" + (inEdge.getPartitionCount() - 1) + "]");
+        if (inEdge.getPartitionCount() > 1) {
+          broadcastInputs.add(formattedSystemStream + "#[0-" + (inEdge.getPartitionCount() - 1) + "]");
+        } else {
+          broadcastInputs.add(formattedSystemStream + "#0");
+        }
       } else {
         inputs.add(formattedSystemStream);
       }

http://git-wip-us.apache.org/repos/asf/samza/blob/1a7e2709/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
index ffced0f..051abcf 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
@@ -55,7 +55,7 @@ public class StreamEdge {
     this.isIntermediate = isIntermediate;
     // broadcast can be configured either by an operator or via the configs
     this.isBroadcast =
-          isBroadcast || (config == null) ? false : new StreamConfig(config).getBroadcastEnabled(streamSpec.toSystemStream());
+          isBroadcast || new StreamConfig(config).getBroadcastEnabled(streamSpec.toSystemStream());
     this.config = config;
     if (isBroadcast && isIntermediate) {
       partitions = 1;
@@ -113,21 +113,21 @@ public class StreamEdge {
   }
 
   Config generateConfig() {
-    Map<String, String> newConfig = new HashMap<>();
+    Map<String, String> streamConfig = new HashMap<>();
     StreamSpec spec = getStreamSpec();
-    newConfig.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), spec.getId()), spec.getSystemName());
-    newConfig.put(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), spec.getId()), spec.getPhysicalName());
+    streamConfig.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), spec.getId()), spec.getSystemName());
+    streamConfig.put(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), spec.getId()), spec.getPhysicalName());
     if (isIntermediate()) {
-      newConfig.put(String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID(), spec.getId()), "true");
-      newConfig.put(String.format(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID(), spec.getId()), "true");
-      newConfig.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(), spec.getId()), "oldest");
-      newConfig.put(String.format(StreamConfig.PRIORITY_FOR_STREAM_ID(), spec.getId()), String.valueOf(Integer.MAX_VALUE));
+      streamConfig.put(String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID(), spec.getId()), "true");
+      streamConfig.put(String.format(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID(), spec.getId()), "true");
+      streamConfig.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(), spec.getId()), "oldest");
+      streamConfig.put(String.format(StreamConfig.PRIORITY_FOR_STREAM_ID(), spec.getId()), String.valueOf(Integer.MAX_VALUE));
     }
     spec.getConfig().forEach((property, value) -> {
-        newConfig.put(String.format(StreamConfig.STREAM_ID_PREFIX(), spec.getId()) + property, value);
+        streamConfig.put(String.format(StreamConfig.STREAM_ID_PREFIX(), spec.getId()) + property, value);
       });
 
-    return new MapConfig(newConfig);
+    return new MapConfig(streamConfig);
   }
 
   public boolean isBroadcast() {