You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2016/05/05 17:59:17 UTC

samza git commit: SAMZA-944 - Broadcast stream is not added properly in the prioritized tiers in the DefaultChooser

Repository: samza
Updated Branches:
  refs/heads/master 668a952ac -> 63ccf5eb1


SAMZA-944 - Broadcast stream is not added properly in the prioritized tiers in the DefaultChooser


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/63ccf5eb
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/63ccf5eb
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/63ccf5eb

Branch: refs/heads/master
Commit: 63ccf5eb14f07840a61fdb8b44d2750019e56181
Parents: 668a952
Author: Jacob Maes <ja...@gmail.com>
Authored: Thu May 5 10:30:33 2016 -0700
Committer: Navina Ramesh <nr...@linkedin.com>
Committed: Thu May 5 10:30:33 2016 -0700

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |  2 +
 .../samza/config/DefaultChooserConfig.java      | 85 ++++++++++++++++++++
 .../org/apache/samza/config/TaskConfigJava.java | 34 +++++++-
 .../samza/config/DefaultChooserConfig.scala     | 50 ------------
 .../samza/system/chooser/DefaultChooser.scala   | 36 +++------
 .../system/chooser/TestDefaultChooser.scala     | 57 ++++++++++---
 6 files changed, 177 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/63ccf5eb/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 9afab88..8d77486 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -31,6 +31,8 @@
     <allow pkg="org.apache.log4j" />
     <allow pkg="org.apache.kafka" />
     <allow pkg="org.apache.commons" />
+    <allow class="scala.collection.JavaConversions" />
+    <allow class="scala.collection.JavaConverters" />
 
     <subpackage name="config">
         <allow class="org.apache.samza.SamzaException" />

http://git-wip-us.apache.org/repos/asf/samza/blob/63ccf5eb/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java b/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java
new file mode 100644
index 0000000..d242d14
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.config;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * A convenience class for fetching configs related to the {@link org.apache.samza.system.chooser.DefaultChooser}
+ */
+public class DefaultChooserConfig extends MapConfig {
+  public static final String BOOTSTRAP = StreamConfig.STREAM_PREFIX() + "samza.bootstrap";
+  public static final String PRIORITY = StreamConfig.STREAM_PREFIX() + "samza.priority";
+  public static final String BATCH_SIZE = "task.consumer.batch.size";
+
+  private final TaskConfigJava taskConfigJava;
+
+  public DefaultChooserConfig(Config config) {
+    super(config);
+    taskConfigJava = new TaskConfigJava(config);
+  }
+
+  /**
+   * @return  the configured batch size, or 0 if it was not configured.
+   */
+  public int getChooserBatchSize() {
+    return getInt(BATCH_SIZE, 0);
+  }
+
+  /**
+   * @return  the set of SystemStreams which were configured as bootstrap streams.
+   */
+  public Set<SystemStream> getBootstrapStreams() {
+    Set<SystemStream> bootstrapInputs = new HashSet<>();
+    Set<SystemStream> allInputs = taskConfigJava.getAllInputStreams();
+    for (SystemStream systemStream : allInputs) {
+      if (getBoolean(String.format(BOOTSTRAP, systemStream.getSystem(), systemStream.getStream()), false)) {
+        bootstrapInputs.add(systemStream);
+      }
+    }
+    return Collections.unmodifiableSet(bootstrapInputs);
+  }
+
+  /**
+   * Gets the priority of every SystemStream for which the priority
+   * was explicitly configured with a value &gt=0.
+   *
+   * @return  the explicitly-configured stream priorities as a map from
+   *          SystemStream to the configured priority value. Streams that
+   *          were not explicitly configured with a priority are not returned.
+   */
+  public Map<SystemStream, Integer> getPriorityStreams() {
+    Set<SystemStream> allInputs = taskConfigJava.getAllInputStreams();
+
+    Map<SystemStream, Integer> priorityStreams = new HashMap<>();
+    for (SystemStream systemStream : allInputs) {
+      int priority = getInt(String.format(PRIORITY, systemStream.getSystem(), systemStream.getStream()), -1);
+      if (priority >= 0) {
+        priorityStreams.put(systemStream, priority);
+      }
+    }
+    return Collections.unmodifiableMap(priorityStreams);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/63ccf5eb/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
index 8acb6ca..021d42a 100644
--- a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.config;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -30,6 +31,8 @@ import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.collection.JavaConversions;
+
 
 public class TaskConfigJava extends MapConfig {
   // broadcast streams consumed by all tasks. e.g. kafka.foo#1
@@ -52,7 +55,7 @@ public class TaskConfigJava extends MapConfig {
    */
   public Set<SystemStreamPartition> getBroadcastSystemStreamPartitions() {
     HashSet<SystemStreamPartition> systemStreamPartitionSet = new HashSet<SystemStreamPartition>();
-    List<String> systemStreamPartitions = getList(BROADCAST_INPUT_STREAMS);
+    List<String> systemStreamPartitions = getList(BROADCAST_INPUT_STREAMS, Collections.<String>emptyList());
 
     for (String systemStreamPartition : systemStreamPartitions) {
       int hashPosition = systemStreamPartition.indexOf("#");
@@ -85,4 +88,33 @@ public class TaskConfigJava extends MapConfig {
     }
     return systemStreamPartitionSet;
   }
+
+  /**
+   * Get the SystemStreams for the configured broadcast streams.
+   *
+   * @return the set of SystemStreams for which there are broadcast stream SSPs configured.
+   */
+  public Set<SystemStream> getBroadcastSystemStreams() {
+    Set<SystemStream> broadcastSS = new HashSet<>();
+    Set<SystemStreamPartition> broadcastSSPs = getBroadcastSystemStreamPartitions();
+    for (SystemStreamPartition bssp : broadcastSSPs) {
+      broadcastSS.add(bssp.getSystemStream());
+    }
+    return Collections.unmodifiableSet(broadcastSS);
+  }
+
+  /**
+   * Get the SystemStreams for the configured input and broadcast streams.
+   *
+   * @return the set of SystemStreams for both standard inputs and broadcast stream inputs.
+   */
+  public Set<SystemStream> getAllInputStreams() {
+    Set<SystemStream> allInputSS = new HashSet<>();
+
+    TaskConfig taskConfig = TaskConfig.Config2Task(this);
+    allInputSS.addAll(JavaConversions.asJavaSet(taskConfig.getInputStreams()));
+    allInputSS.addAll(getBroadcastSystemStreams());
+
+    return Collections.unmodifiableSet(allInputSS);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/63ccf5eb/samza-core/src/main/scala/org/apache/samza/config/DefaultChooserConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/DefaultChooserConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/DefaultChooserConfig.scala
deleted file mode 100644
index 4224393..0000000
--- a/samza-core/src/main/scala/org/apache/samza/config/DefaultChooserConfig.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.config
-
-import org.apache.samza.system.SystemStream
-import TaskConfig._
-
-object DefaultChooserConfig {
-  val BOOTSTRAP = StreamConfig.STREAM_PREFIX + "samza.bootstrap"
-  val PRIORITY = StreamConfig.STREAM_PREFIX + "samza.priority"
-  val BATCH_SIZE = "task.consumer.batch.size"
-
-  implicit def Config2DefaultChooser(config: Config) = new DefaultChooserConfig(config)
-}
-
-class DefaultChooserConfig(config: Config) extends ScalaMapConfig(config) {
-  import DefaultChooserConfig._
-
-  def getChooserBatchSize = getOption(BATCH_SIZE)
-
-  def getBootstrapStreams = config
-    .getInputStreams
-    .map(systemStream => (systemStream, getOrElse(BOOTSTRAP format (systemStream.getSystem, systemStream.getStream), "false").equals("true")))
-    .filter(_._2.equals(true))
-    .map(_._1)
-
-  def getPriorityStreams = config
-    .getInputStreams
-    .map(systemStream => (systemStream, getOrElse(PRIORITY format (systemStream.getSystem, systemStream.getStream), "-1").toInt))
-    .filter(_._2 >= 0)
-    .toMap
-}
-

http://git-wip-us.apache.org/repos/asf/samza/blob/63ccf5eb/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
index 95bd188..b433713 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
@@ -19,50 +19,40 @@
 
 package org.apache.samza.system.chooser
 
-import scala.collection.JavaConversions._
 import org.apache.samza.SamzaException
-import org.apache.samza.config.Config
-import org.apache.samza.config.SystemConfig._
-import org.apache.samza.config.DefaultChooserConfig._
-import org.apache.samza.config.TaskConfig._
-import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.system.SystemFactory
-import org.apache.samza.system.SystemStream
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.Util
-import org.apache.samza.system.SystemAdmin
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.system.SystemStreamMetadata
+import org.apache.samza.config.{Config, DefaultChooserConfig, TaskConfigJava}
+import org.apache.samza.metrics.{MetricsRegistry, MetricsRegistryMap}
+import org.apache.samza.system.{IncomingMessageEnvelope, SystemStream, SystemStreamMetadata, SystemStreamPartition}
 import org.apache.samza.util.Logging
 
+import scala.collection.JavaConverters._
+
+
 object DefaultChooser extends Logging {
   def apply(inputStreamMetadata: Map[SystemStream, SystemStreamMetadata], chooserFactory: MessageChooserFactory, config: Config, registry: MetricsRegistry) = {
-    val batchSize = config.getChooserBatchSize match {
-      case Some(batchSize) => Some(batchSize.toInt)
-      case _ => None
-    }
+    val chooserConfig = new DefaultChooserConfig(config)
+    val batchSize = if (chooserConfig.getChooserBatchSize > 0) Some(chooserConfig.getChooserBatchSize) else None
 
     debug("Got batch size: %s" format batchSize)
 
     // Normal streams default to priority 0.
-    val defaultPrioritizedStreams = config
-      .getInputStreams
+    val defaultPrioritizedStreams = new TaskConfigJava(config)
+      .getAllInputStreams.asScala
       .map((_, 0))
       .toMap
 
     debug("Got default priority streams: %s" format defaultPrioritizedStreams)
 
     // Bootstrap streams default to Int.MaxValue priority.
-    val prioritizedBootstrapStreams = config
-      .getBootstrapStreams
+    val prioritizedBootstrapStreams = chooserConfig
+      .getBootstrapStreams.asScala
       .map((_, Int.MaxValue))
       .toMap
 
     debug("Got bootstrap priority streams: %s" format prioritizedBootstrapStreams)
 
     // Explicitly prioritized streams are set to whatever they were configured to.
-    val prioritizedStreams = config.getPriorityStreams
+    val prioritizedStreams = chooserConfig.getPriorityStreams.asScala.mapValues(_.asInstanceOf[Int])
 
     debug("Got prioritized streams: %s" format prioritizedStreams)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/63ccf5eb/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
index 0909956..7fb70b2 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
@@ -19,17 +19,13 @@
 
 package org.apache.samza.system.chooser
 
-import org.junit.Assert._
-import org.junit.Test
-import org.apache.samza.util.BlockingEnvelopeMap
-import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.Partition
-import org.apache.samza.config.MapConfig
-import org.apache.samza.config.DefaultChooserConfig
-import org.apache.samza.system.SystemStream
-import org.apache.samza.system.SystemStreamMetadata
+import org.apache.samza.config.{DefaultChooserConfig, MapConfig}
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.system.{IncomingMessageEnvelope, SystemStream, SystemStreamMetadata, SystemStreamPartition}
+import org.apache.samza.util.BlockingEnvelopeMap
+import org.junit.Assert._
+import org.junit.Test
 
 import scala.collection.JavaConversions._
 
@@ -141,11 +137,10 @@ class TestDefaultChooser {
 
   @Test
   def testBootstrapConfig {
-    import DefaultChooserConfig.Config2DefaultChooser
     val configMap = Map(
       "task.inputs" -> "kafka.foo,kafka.bar-baz",
       "systems.kafka.streams.bar-baz.samza.bootstrap" -> "true")
-    val config = new MapConfig(configMap)
+    val config = new DefaultChooserConfig(new MapConfig(configMap))
     val bootstrapStreams = config.getBootstrapStreams
     assertEquals(1, bootstrapStreams.size)
     assertTrue(bootstrapStreams.contains(new SystemStream("kafka", "bar-baz")))
@@ -153,15 +148,51 @@ class TestDefaultChooser {
 
   @Test
   def testPriorityConfig {
-    import DefaultChooserConfig.Config2DefaultChooser
     val configMap = Map(
       "task.inputs" -> "kafka.foo,kafka.bar-baz",
       "systems.kafka.streams.bar-baz.samza.priority" -> "3")
-    val config = new MapConfig(configMap)
+    val config = new DefaultChooserConfig(new MapConfig(configMap))
     val priorityStreams = config.getPriorityStreams
     assertEquals(1, priorityStreams.size)
     assertEquals(3, priorityStreams(new SystemStream("kafka", "bar-baz")))
   }
+
+  @Test
+  def testBroadcastOnlyConfig {
+    val configMap = Map(
+      "task.broadcast.inputs" -> "kafka.foo#[1-2],kafka.bar-baz#5,kafka.fizz#0",
+      "systems.kafka.streams.bar-baz.samza.priority" -> "3",
+      "systems.kafka.streams.fizz.samza.bootstrap" -> "true")
+    val config = new DefaultChooserConfig(new MapConfig(configMap))
+    val priorityStreams = config.getPriorityStreams
+    assertEquals(1, priorityStreams.size)
+    assertEquals(3, priorityStreams(new SystemStream("kafka", "bar-baz")))
+
+    val bootstrapStreams = config.getBootstrapStreams
+    assertEquals(1, bootstrapStreams.size())
+    assertTrue(bootstrapStreams.contains(new SystemStream("kafka", "fizz")))
+  }
+
+  @Test
+  def testBroadcastAndStandardInputConfig {
+    val configMap = Map(
+      "task.broadcast.inputs" -> "kafka.foo#[1-2],kafka.bar-baz#5,kafka.fizz#0",
+      "task.inputs" -> "kafka.bootstrapTopic,kafka.priorityTopic,kafka.normalTopic",
+      "systems.kafka.streams.priorityTopic.samza.priority" -> "2",
+      "systems.kafka.streams.bar-baz.samza.priority" -> "3",
+      "systems.kafka.streams.bootstrapTopic.samza.bootstrap" -> "true",
+      "systems.kafka.streams.fizz.samza.bootstrap" -> "true")
+    val config = new DefaultChooserConfig(new MapConfig(configMap))
+    val priorityStreams = config.getPriorityStreams
+    assertEquals(2, priorityStreams.size)
+    assertEquals(2, priorityStreams(new SystemStream("kafka", "priorityTopic")))
+    assertEquals(3, priorityStreams(new SystemStream("kafka", "bar-baz")))
+
+    val bootstrapStreams = config.getBootstrapStreams
+    assertEquals(2, bootstrapStreams.size())
+    assertTrue(bootstrapStreams.contains(new SystemStream("kafka", "bootstrapTopic")))
+    assertTrue(bootstrapStreams.contains(new SystemStream("kafka", "fizz")))
+  }
 }
 
 class MockBlockingEnvelopeMap extends BlockingEnvelopeMap {