You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/12/05 19:54:56 UTC

samza git commit: SAMZA-2019: add Is broadcast per stream config

Repository: samza
Updated Branches:
  refs/heads/master 5ea72584f -> b668b5bea


SAMZA-2019: add Is broadcast per stream config

Author: Boris S <bs...@linkedin.com>
Author: Boris S <bo...@apache.org>
Author: Boris Shkolnik <bs...@linkedin.com>

Reviewers: xinyuiscool <xi...@linkedin.com>, Prateek M <pr...@apache.org>

Closes #837 from sborya/isBroadcast


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b668b5be
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b668b5be
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b668b5be

Branch: refs/heads/master
Commit: b668b5bea825cf84c06a342f1a63470d131a5749
Parents: 5ea7258
Author: Boris S <bs...@linkedin.com>
Authored: Wed Dec 5 11:54:44 2018 -0800
Committer: Boris S <bs...@linkedin.com>
Committed: Wed Dec 5 11:54:44 2018 -0800

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html     |  7 ++++++
 .../versioned/jobs/samza-configurations.md      |  3 ++-
 .../JobNodeConfigurationGenerator.java          |  2 +-
 .../org/apache/samza/execution/StreamEdge.java  | 25 +++++++++++---------
 .../org/apache/samza/config/StreamConfig.scala  |  5 ++++
 .../apache/samza/config/TestStreamConfig.java   |  4 ++++
 .../samza/execution/TestExecutionPlanner.java   | 18 ++++++++++++++
 .../apache/samza/execution/TestJobGraph.java    | 13 +++++-----
 .../apache/samza/testUtils/StreamTestUtils.java |  2 +-
 9 files changed, 59 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/b668b5be/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index ad9f69f..6887c35 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -1180,6 +1180,13 @@
                     </td>
                 </tr>
 
+                    <td class="property" id="streams-samza-broadcast">streams.<span class="stream">stream-id</span>.<br>samza.broadcast</td>
+                    <td class="default">false</td>
+                    <td class="description">
+                        If set to <code>true</code>, this stream will be broadcasted to all the tasks.
+                    </td>
+                </tr>
+
                 <tr>
                     <td class="property" id="streams-properties">streams.<span class="stream">stream-id</span>.*</td>
                     <td class="default"></td>

http://git-wip-us.apache.org/repos/asf/samza/blob/b668b5be/docs/learn/documentation/versioned/jobs/samza-configurations.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index e55446a..54ec12c 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -140,6 +140,7 @@ Samza consumes from and produces to [Streams](../container/streams.html) and has
 |streams.**_stream-id_**.<br>samza.reset.offset|false|If set to true, when a Samza container starts up, it ignores any [checkpointed offset](../container/checkpointing.html) for this particular input stream. Its behavior is thus determined by the `samza.offset.default` setting. Note that the reset takes effect every time a container is started, which may be every time you restart your job, or more frequently if a container fails and is restarted by the framework.|
 |streams.**_stream-id_**.<br>samza.priority|-1|If one or more streams have a priority set (any positive integer), they will be processed with [higher priority](../container/streams.html#prioritizing-input-streams) than the other streams. You can set several streams to the same priority, or define multiple priority levels by assigning a higher number to the higher-priority streams. If a higher-priority stream has any messages available, they will always be processed first; messages from lower-priority streams are only processed when there are no new messages on higher-priority inputs.|
 |streams.**_stream-id_**.<br>samza.bootstrap|false|If set to true, this stream will be processed as a [bootstrap stream](../container/streams.html#bootstrapping). This means that every time a Samza container starts up, this stream will be fully consumed before messages from any other stream are processed.|
+|streams.**_stream-id_**.<br>samza.broadcast|false|If set to true, this stream will be processed as a [broadcast stream](../container/samza-container.html#broadcast-streams). This means that ALL the partitions of this stream will be delivered to all the tasks.|
 |task.consumer.batch.size|1|If set to a positive integer, the task will try to consume batches with the given number of messages from each input stream, rather than consuming round-robin from all the input streams on each individual message. Setting this property can improve performance in some cases.|
 
 #### <a name="kafka"></a>[3.2 Kafka](#kafka)
@@ -333,4 +334,4 @@ Samza supports both standalone and clustered ([YARN](yarn-jobs.html)) [deploymen
 |metrics.reporter.**_reporter-name_**.class| |Samza automatically tracks various metrics which are useful for monitoring the health of a job, and you can also track your own metrics. With this property, you can define any number of metrics reporters which send the metrics to a system of your choice (for graphing, alerting etc). You give each reporter an arbitrary reporter-name. To enable the reporter, you need to reference the reporter-name in metrics.reporters. The value of this property is the fully-qualified name of a Java class that implements MetricsReporterFactory. Samza ships with these implementations by default: <br><br>`org.apache.samza.metrics.reporter.JmxReporterFactory`<br>With this reporter, every container exposes its own metrics as JMX MBeans. The JMX server is started on a random port to avoid collisions between containers running on the same machine.<br><br>`org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory`<br>This reporter sends the latest values o
 f all metrics as messages to an output stream once per minute. The output stream is configured with metrics.reporter.*.stream and it can use any system supported by Samza.|
 |metrics.reporters| |If you have defined any metrics reporters with metrics.reporter.*.class, you need to list them here in order to enable them. The value of this property is a comma-separated list of reporter-name tokens.|
 |metrics.reporter.**_reporter-name_**.stream| |If you have registered the metrics reporter metrics.reporter.*.class = `org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory`, you need to set this property to configure the output stream to which the metrics data should be sent. The stream is given in the form system-name.stream-name, and the system must be defined in the job configuration. It's fine for many different jobs to publish their metrics to the same metrics stream. Samza defines a simple JSON encoding for metrics; in order to use this encoding, you also need to configure a serde for the metrics stream: <br><br>streams.*.samza.msg.serde = `metrics-serde` (replacing the asterisk with the stream-name of the metrics stream) <br>serializers.registry.metrics-serde.class = `org.apache.samza.serializers.MetricsSnapshotSerdeFactory` (registering the serde under a serde-name of metrics-serde)|
-|metrics.reporter.reporter-name.interval|60|If you have registered the metrics reporter `metrics.reporter.*.class` = `org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory`, you can use this property to configure how frequently the reporter will report the metrics registered with it. The value for this property should be length of the interval between consecutive metric reporting. This value is in seconds, and should be a positive integer value. This property is optional and set to 60 by default, which means metrics will be reported every 60 seconds.|
\ No newline at end of file
+|metrics.reporter.reporter-name.interval|60|If you have registered the metrics reporter `metrics.reporter.*.class` = `org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory`, you can use this property to configure how frequently the reporter will report the metrics registered with it. The value for this property should be length of the interval between consecutive metric reporting. This value is in seconds, and should be a positive integer value. This property is optional and set to 60 by default, which means metrics will be reported every 60 seconds.|

http://git-wip-us.apache.org/repos/asf/samza/blob/b668b5be/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
index 2fed979..a762dec 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNodeConfigurationGenerator.java
@@ -105,7 +105,7 @@ import org.slf4j.LoggerFactory;
     for (StreamEdge inEdge : inEdges.values()) {
       String formattedSystemStream = inEdge.getName();
       if (inEdge.isBroadcast()) {
-        broadcastInputs.add(formattedSystemStream + "#0");
+        broadcastInputs.add(formattedSystemStream + "#[0-" + (inEdge.getPartitionCount() - 1) + "]");
       } else {
         inputs.add(formattedSystemStream);
       }

http://git-wip-us.apache.org/repos/asf/samza/blob/b668b5be/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
index f2f0310..ffced0f 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
@@ -53,9 +53,11 @@ public class StreamEdge {
   StreamEdge(StreamSpec streamSpec, boolean isIntermediate, boolean isBroadcast, Config config) {
     this.streamSpec = streamSpec;
     this.isIntermediate = isIntermediate;
-    this.isBroadcast = isBroadcast;
+    // broadcast can be configured either by an operator or via the configs
+    this.isBroadcast =
+          isBroadcast || (config == null) ? false : new StreamConfig(config).getBroadcastEnabled(streamSpec.toSystemStream());
     this.config = config;
-    if (isBroadcast) {
+    if (isBroadcast && isIntermediate) {
       partitions = 1;
     }
     this.name = StreamUtil.getNameFromSystemStream(getSystemStream());
@@ -111,20 +113,21 @@ public class StreamEdge {
   }
 
   Config generateConfig() {
-    Map<String, String> config = new HashMap<>();
+    Map<String, String> newConfig = new HashMap<>();
     StreamSpec spec = getStreamSpec();
-    config.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), spec.getId()), spec.getSystemName());
-    config.put(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), spec.getId()), spec.getPhysicalName());
+    newConfig.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), spec.getId()), spec.getSystemName());
+    newConfig.put(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), spec.getId()), spec.getPhysicalName());
     if (isIntermediate()) {
-      config.put(String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID(), spec.getId()), "true");
-      config.put(String.format(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID(), spec.getId()), "true");
-      config.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(), spec.getId()), "oldest");
-      config.put(String.format(StreamConfig.PRIORITY_FOR_STREAM_ID(), spec.getId()), String.valueOf(Integer.MAX_VALUE));
+      newConfig.put(String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID(), spec.getId()), "true");
+      newConfig.put(String.format(StreamConfig.DELETE_COMMITTED_MESSAGES_FOR_STREAM_ID(), spec.getId()), "true");
+      newConfig.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(), spec.getId()), "oldest");
+      newConfig.put(String.format(StreamConfig.PRIORITY_FOR_STREAM_ID(), spec.getId()), String.valueOf(Integer.MAX_VALUE));
     }
     spec.getConfig().forEach((property, value) -> {
-        config.put(String.format(StreamConfig.STREAM_ID_PREFIX(), spec.getId()) + property, value);
+        newConfig.put(String.format(StreamConfig.STREAM_ID_PREFIX(), spec.getId()) + property, value);
       });
-    return new MapConfig(config);
+
+    return new MapConfig(newConfig);
   }
 
   public boolean isBroadcast() {

http://git-wip-us.apache.org/repos/asf/samza/blob/b668b5be/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
index 298c8ca..252e38f 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
@@ -39,6 +39,7 @@ object StreamConfig {
   val IS_INTERMEDIATE =         SAMZA_PROPERTY + "intermediate"
   val DELETE_COMMITTED_MESSAGES = SAMZA_PROPERTY + "delete.committed.messages"
   val IS_BOUNDED =              SAMZA_PROPERTY + "bounded"
+  val BROADCAST =            SAMZA_PROPERTY + "broadcast"
 
   // We don't want any external dependencies on these patterns while both exist. Use getProperty to ensure proper values.
   private val STREAMS_PREFIX = "streams."
@@ -53,6 +54,7 @@ object StreamConfig {
   val PRIORITY_FOR_STREAM_ID = STREAM_ID_PREFIX + PRIORITY
   val CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID = STREAM_ID_PREFIX + CONSUMER_OFFSET_DEFAULT
   val BOOTSTRAP_FOR_STREAM_ID = STREAM_ID_PREFIX + BOOTSTRAP
+  val BROADCAST_FOR_STREAM_ID = STREAM_ID_PREFIX + BROADCAST
 
   implicit def Config2Stream(config: Config) = new StreamConfig(config)
 }
@@ -85,6 +87,9 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
   def getBootstrapEnabled(systemStream: SystemStream) =
     java.lang.Boolean.parseBoolean(getSamzaProperty(systemStream, StreamConfig.BOOTSTRAP))
 
+  def getBroadcastEnabled(systemStream: SystemStream) =
+    java.lang.Boolean.parseBoolean(getSamzaProperty(systemStream, StreamConfig.BROADCAST))
+
   def getPriority(systemStream: SystemStream) =
     java.lang.Integer.parseInt(getSamzaProperty(systemStream, StreamConfig.PRIORITY, "-1"))
 

http://git-wip-us.apache.org/repos/asf/samza/blob/b668b5be/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java
index f6a617c..474a382 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java
@@ -81,8 +81,10 @@ public class TestStreamConfig {
     StreamConfig config = buildConfig("key1", "value1",
         buildProp(STREAM1_STREAM_ID, "samza.key2"), "value2",
         buildProp(STREAM1_STREAM_ID, StreamConfig.SYSTEM()), STREAM1_SYSTEM,
+        buildProp(STREAM1_STREAM_ID, StreamConfig.BROADCAST()), "true",
         buildProp(STREAM1_STREAM_ID, StreamConfig.PHYSICAL_NAME()), STREAM1_PHYSICAL_NAME);
     assertEquals("value2", config.getSamzaProperty(SYSTEM_STREAM_1, "samza.key2"));
+    assertEquals("true", config.getSamzaProperty(SYSTEM_STREAM_1, "samza.broadcast"));
   }
 
   // 11
@@ -228,6 +230,8 @@ public class TestStreamConfig {
 
     // Property set via systems.x.default.stream.* and system.x.streams.z.*
     assertEquals("5", config.getStreamProperties(STREAM3_STREAM_ID).get(nonSamzaProperty));
+
+    assertEquals(false, config.getBroadcastEnabled(SYSTEM_STREAM_1));
   }
 
 

http://git-wip-us.apache.org/repos/asf/samza/blob/b668b5be/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index 25d5071..26c8dde 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -40,6 +40,7 @@ import org.apache.samza.application.descriptors.TaskApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StreamConfig$;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.descriptors.GenericInputDescriptor;
@@ -67,6 +68,7 @@ import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.descriptors.TestLocalTableDescriptor;
 import org.apache.samza.testUtils.StreamTestUtils;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -636,6 +638,22 @@ public class TestExecutionPlanner {
   }
 
   @Test
+  public void testBroadcastConfig() {
+    Map<String, String> map = new HashMap<>(config);
+    map.put(String.format(StreamConfig$.MODULE$.BROADCAST_FOR_STREAM_ID(), "input1"), "true");
+    Config cfg = new MapConfig(map);
+
+    ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
+    StreamApplicationDescriptorImpl graphSpec = createSimpleGraph();
+    JobGraph jobGraph = (JobGraph) planner.plan(graphSpec);
+
+    StreamEdge edge = jobGraph.getStreamEdge("input1");
+    Assert.assertTrue(edge.isBroadcast());
+    Config jobConfig = jobGraph.getJobConfigs().get(0);
+    Assert.assertEquals("system1.input1#[0-63]", jobConfig.get("task.broadcast.inputs"));
+  }
+
+  @Test
   public void testMaxPartitionLimit() {
     int partitionLimit = IntermediateStreamManager.MAX_INFERRED_PARTITIONS;
 

http://git-wip-us.apache.org/repos/asf/samza/blob/b668b5be/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
index 979be94..221484a 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraph.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.system.StreamSpec;
 import org.junit.Before;
 import org.junit.Test;
@@ -60,7 +61,7 @@ public class TestJobGraph {
    */
   private void createGraph1() {
     StreamApplicationDescriptorImpl appDesc = mock(StreamApplicationDescriptorImpl.class);
-    graph1 = new JobGraph(null, appDesc);
+    graph1 = new JobGraph(new MapConfig(), appDesc);
 
     JobNode n2 = graph1.getOrCreateJobNode("2", "1");
     JobNode n3 = graph1.getOrCreateJobNode("3", "1");
@@ -94,7 +95,7 @@ public class TestJobGraph {
    */
   private void createGraph2() {
     StreamApplicationDescriptorImpl appDesc = mock(StreamApplicationDescriptorImpl.class);
-    graph2 = new JobGraph(null, appDesc);
+    graph2 = new JobGraph(new MapConfig(), appDesc);
 
     JobNode n1 = graph2.getOrCreateJobNode("1", "1");
     JobNode n2 = graph2.getOrCreateJobNode("2", "1");
@@ -122,7 +123,7 @@ public class TestJobGraph {
    */
   private void createGraph3() {
     StreamApplicationDescriptorImpl appDesc = mock(StreamApplicationDescriptorImpl.class);
-    graph3 = new JobGraph(null, appDesc);
+    graph3 = new JobGraph(new MapConfig(), appDesc);
 
     JobNode n1 = graph3.getOrCreateJobNode("1", "1");
     JobNode n2 = graph3.getOrCreateJobNode("2", "1");
@@ -139,7 +140,7 @@ public class TestJobGraph {
    */
   private void createGraph4() {
     StreamApplicationDescriptorImpl appDesc = mock(StreamApplicationDescriptorImpl.class);
-    graph4 = new JobGraph(null, appDesc);
+    graph4 = new JobGraph(new MapConfig(), appDesc);
 
     JobNode n1 = graph4.getOrCreateJobNode("1", "1");
 
@@ -158,7 +159,7 @@ public class TestJobGraph {
   @Test
   public void testAddSource() {
     StreamApplicationDescriptorImpl appDesc = mock(StreamApplicationDescriptorImpl.class);
-    JobGraph graph = new JobGraph(null, appDesc);
+    JobGraph graph = new JobGraph(new MapConfig(), appDesc);
 
     /**
      * s1 -> 1
@@ -200,7 +201,7 @@ public class TestJobGraph {
      * 2 -> s3
      */
     StreamApplicationDescriptorImpl appDesc = mock(StreamApplicationDescriptorImpl.class);
-    JobGraph graph = new JobGraph(null, appDesc);
+    JobGraph graph = new JobGraph(new MapConfig(), appDesc);
     JobNode n1 = graph.getOrCreateJobNode("1", "1");
     JobNode n2 = graph.getOrCreateJobNode("2", "1");
     StreamSpec s1 = genStream();

http://git-wip-us.apache.org/repos/asf/samza/blob/b668b5be/samza-core/src/test/java/org/apache/samza/testUtils/StreamTestUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/StreamTestUtils.java b/samza-core/src/test/java/org/apache/samza/testUtils/StreamTestUtils.java
index 9b1fa4d..0aebd30 100644
--- a/samza-core/src/test/java/org/apache/samza/testUtils/StreamTestUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/testUtils/StreamTestUtils.java
@@ -24,7 +24,7 @@ import org.apache.samza.config.StreamConfig$;
 public class StreamTestUtils {
 
   /**
-   * Adds the stream.stream-id.* configurations for the provided {@code streamId} to the provided {@code configs} Map.
+   * Adds the streams.stream-id.* configurations for the provided {@code streamId} to the provided {@code configs} Map.
    *
    * @param configs the configs Map to add the stream configurations to
    * @param streamId the id of the stream