You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/12/05 22:14:00 UTC
samza git commit: SAMZA-2019: for 1 partition broadcast topic
generate topic#0 config
Repository: samza
Updated Branches:
refs/heads/master b668b5bea -> 1a7e27097
SAMZA-2019: for 1 partition broadcast topic generate topic#0 config
+ address few review comments
Author: Boris S <bs...@linkedin.com>
Author: Boris S <bo...@apache.org>
Author: Boris Shkolnik <bs...@linkedin.com>
Reviewers: xiliu <xi...@linkedin.com>
Closes #846 from sborya/isBroadcast1
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1a7e2709
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1a7e2709
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1a7e2709
Branch: refs/heads/master
Commit: 1a7e27097c7509863437e96b1c392e3886ca67ab
Parents: b668b5b
Author: Boris S <bs...@linkedin.com>
Authored: Wed Dec 5 14:13:50 2018 -0800
Committer: Boris S <bs...@linkedin.com>
Committed: Wed Dec 5 14:13:50 2018 -0800
----------------------------------------------------------------------
.../JobNodeConfigurationGenerator.java | 6 +++++-
.../org/apache/samza/execution/StreamEdge.java | 20 ++++++++++----------
2 files changed, 15 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/1a7e2709/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
index a762dec..761fb05 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
@@ -105,7 +105,11 @@ import org.slf4j.LoggerFactory;
for (StreamEdge inEdge : inEdges.values()) {
String formattedSystemStream = inEdge.getName();
if (inEdge.isBroadcast()) {
- broadcastInputs.add(formattedSystemStream + "#[0-" + (inEdge.getPartitionCount() - 1) + "]");
+ if (inEdge.getPartitionCount() > 1) {
+ broadcastInputs.add(formattedSystemStream + "#[0-" + (inEdge.getPartitionCount() - 1) + "]");
+ } else {
+ broadcastInputs.add(formattedSystemStream + "#0");
+ }
} else {
inputs.add(formattedSystemStream);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1a7e2709/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
index ffced0f..051abcf 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
@@ -55,7 +55,7 @@ public class StreamEdge {
this.isIntermediate = isIntermediate;
// broadcast can be configured either by an operator or via the configs
this.isBroadcast =
- isBroadcast || (config == null) ? false : new StreamConfig(config).getBroadcastEnabled(streamSpec.toSystemStream());
+ isBroadcast || new StreamConfig(config).getBroadcastEnabled(streamSpec.toSystemStream());
this.config = config;
if (isBroadcast && isIntermediate) {
partitions = 1;
@@ -113,21 +113,21 @@ public class StreamEdge {
}
Config generateConfig() {
- Map<String, String> newConfig = new HashMap<>();
+ Map<String, String> streamConfig = new HashMap<>();
StreamSpec spec = getStreamSpec();
- newConfig.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), spec.getId()), spec.getSystemName());
- newConfig.put(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), spec.getId()), spec.getPhysicalName());
+ streamConfig.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), spec.getId()), spec.getSystemName());
+ streamConfig.put(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), spec.getId()), spec.getPhysicalName());
if (isIntermediate()) {
- newConfig.put(String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID(), spec.getId()), "true");
- newConfig.put(String.format(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID(), spec.getId()), "true");
- newConfig.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(), spec.getId()), "oldest");
- newConfig.put(String.format(StreamConfig.PRIORITY_FOR_STREAM_ID(), spec.getId()), String.valueOf(Integer.MAX_VALUE));
+ streamConfig.put(String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID(), spec.getId()), "true");
+ streamConfig.put(String.format(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID(), spec.getId()), "true");
+ streamConfig.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(), spec.getId()), "oldest");
+ streamConfig.put(String.format(StreamConfig.PRIORITY_FOR_STREAM_ID(), spec.getId()), String.valueOf(Integer.MAX_VALUE));
}
spec.getConfig().forEach((property, value) -> {
- newConfig.put(String.format(StreamConfig.STREAM_ID_PREFIX(), spec.getId()) + property, value);
+ streamConfig.put(String.format(StreamConfig.STREAM_ID_PREFIX(), spec.getId()) + property, value);
});
- return new MapConfig(newConfig);
+ return new MapConfig(streamConfig);
}
public boolean isBroadcast() {