You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2016/05/05 17:59:17 UTC
samza git commit: SAMZA-944 - Broadcast stream is not added properly
in the prioritized tiers in the DefaultChooser
Repository: samza
Updated Branches:
refs/heads/master 668a952ac -> 63ccf5eb1
SAMZA-944 - Broadcast stream is not added properly in the prioritized tiers in the DefaultChooser
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/63ccf5eb
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/63ccf5eb
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/63ccf5eb
Branch: refs/heads/master
Commit: 63ccf5eb14f07840a61fdb8b44d2750019e56181
Parents: 668a952
Author: Jacob Maes <ja...@gmail.com>
Authored: Thu May 5 10:30:33 2016 -0700
Committer: Navina Ramesh <nr...@linkedin.com>
Committed: Thu May 5 10:30:33 2016 -0700
----------------------------------------------------------------------
checkstyle/import-control.xml | 2 +
.../samza/config/DefaultChooserConfig.java | 85 ++++++++++++++++++++
.../org/apache/samza/config/TaskConfigJava.java | 34 +++++++-
.../samza/config/DefaultChooserConfig.scala | 50 ------------
.../samza/system/chooser/DefaultChooser.scala | 36 +++------
.../system/chooser/TestDefaultChooser.scala | 57 ++++++++++---
6 files changed, 177 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/63ccf5eb/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 9afab88..8d77486 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -31,6 +31,8 @@
<allow pkg="org.apache.log4j" />
<allow pkg="org.apache.kafka" />
<allow pkg="org.apache.commons" />
+ <allow class="scala.collection.JavaConversions" />
+ <allow class="scala.collection.JavaConverters" />
<subpackage name="config">
<allow class="org.apache.samza.SamzaException" />
http://git-wip-us.apache.org/repos/asf/samza/blob/63ccf5eb/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java b/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java
new file mode 100644
index 0000000..d242d14
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.config;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.system.SystemStream;
+
+
+/**
+ * A convenience class for fetching configs related to the {@link org.apache.samza.system.chooser.DefaultChooser}
+ */
+public class DefaultChooserConfig extends MapConfig {
+ public static final String BOOTSTRAP = StreamConfig.STREAM_PREFIX() + "samza.bootstrap";
+ public static final String PRIORITY = StreamConfig.STREAM_PREFIX() + "samza.priority";
+ public static final String BATCH_SIZE = "task.consumer.batch.size";
+
+ private final TaskConfigJava taskConfigJava;
+
+ public DefaultChooserConfig(Config config) {
+ super(config);
+ taskConfigJava = new TaskConfigJava(config);
+ }
+
+ /**
+ * @return the configured batch size, or 0 if it was not configured.
+ */
+ public int getChooserBatchSize() {
+ return getInt(BATCH_SIZE, 0);
+ }
+
+ /**
+ * @return the set of SystemStreams which were configured as bootstrap streams.
+ */
+ public Set<SystemStream> getBootstrapStreams() {
+ Set<SystemStream> bootstrapInputs = new HashSet<>();
+ Set<SystemStream> allInputs = taskConfigJava.getAllInputStreams();
+ for (SystemStream systemStream : allInputs) {
+ if (getBoolean(String.format(BOOTSTRAP, systemStream.getSystem(), systemStream.getStream()), false)) {
+ bootstrapInputs.add(systemStream);
+ }
+ }
+ return Collections.unmodifiableSet(bootstrapInputs);
+ }
+
+ /**
+ * Gets the priority of every SystemStream for which the priority
+ * was explicitly configured with a value >=0.
+ *
+ * @return the explicitly-configured stream priorities as a map from
+ * SystemStream to the configured priority value. Streams that
+ * were not explicitly configured with a priority are not returned.
+ */
+ public Map<SystemStream, Integer> getPriorityStreams() {
+ Set<SystemStream> allInputs = taskConfigJava.getAllInputStreams();
+
+ Map<SystemStream, Integer> priorityStreams = new HashMap<>();
+ for (SystemStream systemStream : allInputs) {
+ int priority = getInt(String.format(PRIORITY, systemStream.getSystem(), systemStream.getStream()), -1);
+ if (priority >= 0) {
+ priorityStreams.put(systemStream, priority);
+ }
+ }
+ return Collections.unmodifiableMap(priorityStreams);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/63ccf5eb/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
index 8acb6ca..021d42a 100644
--- a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
@@ -19,6 +19,7 @@
package org.apache.samza.config;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -30,6 +31,8 @@ import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.collection.JavaConversions;
+
public class TaskConfigJava extends MapConfig {
// broadcast streams consumed by all tasks. e.g. kafka.foo#1
@@ -52,7 +55,7 @@ public class TaskConfigJava extends MapConfig {
*/
public Set<SystemStreamPartition> getBroadcastSystemStreamPartitions() {
HashSet<SystemStreamPartition> systemStreamPartitionSet = new HashSet<SystemStreamPartition>();
- List<String> systemStreamPartitions = getList(BROADCAST_INPUT_STREAMS);
+ List<String> systemStreamPartitions = getList(BROADCAST_INPUT_STREAMS, Collections.<String>emptyList());
for (String systemStreamPartition : systemStreamPartitions) {
int hashPosition = systemStreamPartition.indexOf("#");
@@ -85,4 +88,33 @@ public class TaskConfigJava extends MapConfig {
}
return systemStreamPartitionSet;
}
+
+ /**
+ * Get the SystemStreams for the configured broadcast streams.
+ *
+ * @return the set of SystemStreams for which there are broadcast stream SSPs configured.
+ */
+ public Set<SystemStream> getBroadcastSystemStreams() {
+ Set<SystemStream> broadcastSS = new HashSet<>();
+ Set<SystemStreamPartition> broadcastSSPs = getBroadcastSystemStreamPartitions();
+ for (SystemStreamPartition bssp : broadcastSSPs) {
+ broadcastSS.add(bssp.getSystemStream());
+ }
+ return Collections.unmodifiableSet(broadcastSS);
+ }
+
+ /**
+ * Get the SystemStreams for the configured input and broadcast streams.
+ *
+ * @return the set of SystemStreams for both standard inputs and broadcast stream inputs.
+ */
+ public Set<SystemStream> getAllInputStreams() {
+ Set<SystemStream> allInputSS = new HashSet<>();
+
+ TaskConfig taskConfig = TaskConfig.Config2Task(this);
+ allInputSS.addAll(JavaConversions.asJavaSet(taskConfig.getInputStreams()));
+ allInputSS.addAll(getBroadcastSystemStreams());
+
+ return Collections.unmodifiableSet(allInputSS);
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/63ccf5eb/samza-core/src/main/scala/org/apache/samza/config/DefaultChooserConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/DefaultChooserConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/DefaultChooserConfig.scala
deleted file mode 100644
index 4224393..0000000
--- a/samza-core/src/main/scala/org/apache/samza/config/DefaultChooserConfig.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.config
-
-import org.apache.samza.system.SystemStream
-import TaskConfig._
-
-object DefaultChooserConfig {
- val BOOTSTRAP = StreamConfig.STREAM_PREFIX + "samza.bootstrap"
- val PRIORITY = StreamConfig.STREAM_PREFIX + "samza.priority"
- val BATCH_SIZE = "task.consumer.batch.size"
-
- implicit def Config2DefaultChooser(config: Config) = new DefaultChooserConfig(config)
-}
-
-class DefaultChooserConfig(config: Config) extends ScalaMapConfig(config) {
- import DefaultChooserConfig._
-
- def getChooserBatchSize = getOption(BATCH_SIZE)
-
- def getBootstrapStreams = config
- .getInputStreams
- .map(systemStream => (systemStream, getOrElse(BOOTSTRAP format (systemStream.getSystem, systemStream.getStream), "false").equals("true")))
- .filter(_._2.equals(true))
- .map(_._1)
-
- def getPriorityStreams = config
- .getInputStreams
- .map(systemStream => (systemStream, getOrElse(PRIORITY format (systemStream.getSystem, systemStream.getStream), "-1").toInt))
- .filter(_._2 >= 0)
- .toMap
-}
-
http://git-wip-us.apache.org/repos/asf/samza/blob/63ccf5eb/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
index 95bd188..b433713 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
@@ -19,50 +19,40 @@
package org.apache.samza.system.chooser
-import scala.collection.JavaConversions._
import org.apache.samza.SamzaException
-import org.apache.samza.config.Config
-import org.apache.samza.config.SystemConfig._
-import org.apache.samza.config.DefaultChooserConfig._
-import org.apache.samza.config.TaskConfig._
-import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.system.SystemFactory
-import org.apache.samza.system.SystemStream
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.Util
-import org.apache.samza.system.SystemAdmin
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.system.SystemStreamMetadata
+import org.apache.samza.config.{Config, DefaultChooserConfig, TaskConfigJava}
+import org.apache.samza.metrics.{MetricsRegistry, MetricsRegistryMap}
+import org.apache.samza.system.{IncomingMessageEnvelope, SystemStream, SystemStreamMetadata, SystemStreamPartition}
import org.apache.samza.util.Logging
+import scala.collection.JavaConverters._
+
+
object DefaultChooser extends Logging {
def apply(inputStreamMetadata: Map[SystemStream, SystemStreamMetadata], chooserFactory: MessageChooserFactory, config: Config, registry: MetricsRegistry) = {
- val batchSize = config.getChooserBatchSize match {
- case Some(batchSize) => Some(batchSize.toInt)
- case _ => None
- }
+ val chooserConfig = new DefaultChooserConfig(config)
+ val batchSize = if (chooserConfig.getChooserBatchSize > 0) Some(chooserConfig.getChooserBatchSize) else None
debug("Got batch size: %s" format batchSize)
// Normal streams default to priority 0.
- val defaultPrioritizedStreams = config
- .getInputStreams
+ val defaultPrioritizedStreams = new TaskConfigJava(config)
+ .getAllInputStreams.asScala
.map((_, 0))
.toMap
debug("Got default priority streams: %s" format defaultPrioritizedStreams)
// Bootstrap streams default to Int.MaxValue priority.
- val prioritizedBootstrapStreams = config
- .getBootstrapStreams
+ val prioritizedBootstrapStreams = chooserConfig
+ .getBootstrapStreams.asScala
.map((_, Int.MaxValue))
.toMap
debug("Got bootstrap priority streams: %s" format prioritizedBootstrapStreams)
// Explicitly prioritized streams are set to whatever they were configured to.
- val prioritizedStreams = config.getPriorityStreams
+ val prioritizedStreams = chooserConfig.getPriorityStreams.asScala.mapValues(_.asInstanceOf[Int])
debug("Got prioritized streams: %s" format prioritizedStreams)
http://git-wip-us.apache.org/repos/asf/samza/blob/63ccf5eb/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
index 0909956..7fb70b2 100644
--- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
+++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
@@ -19,17 +19,13 @@
package org.apache.samza.system.chooser
-import org.junit.Assert._
-import org.junit.Test
-import org.apache.samza.util.BlockingEnvelopeMap
-import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.Partition
-import org.apache.samza.config.MapConfig
-import org.apache.samza.config.DefaultChooserConfig
-import org.apache.samza.system.SystemStream
-import org.apache.samza.system.SystemStreamMetadata
+import org.apache.samza.config.{DefaultChooserConfig, MapConfig}
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.system.{IncomingMessageEnvelope, SystemStream, SystemStreamMetadata, SystemStreamPartition}
+import org.apache.samza.util.BlockingEnvelopeMap
+import org.junit.Assert._
+import org.junit.Test
import scala.collection.JavaConversions._
@@ -141,11 +137,10 @@ class TestDefaultChooser {
@Test
def testBootstrapConfig {
- import DefaultChooserConfig.Config2DefaultChooser
val configMap = Map(
"task.inputs" -> "kafka.foo,kafka.bar-baz",
"systems.kafka.streams.bar-baz.samza.bootstrap" -> "true")
- val config = new MapConfig(configMap)
+ val config = new DefaultChooserConfig(new MapConfig(configMap))
val bootstrapStreams = config.getBootstrapStreams
assertEquals(1, bootstrapStreams.size)
assertTrue(bootstrapStreams.contains(new SystemStream("kafka", "bar-baz")))
@@ -153,15 +148,51 @@ class TestDefaultChooser {
@Test
def testPriorityConfig {
- import DefaultChooserConfig.Config2DefaultChooser
val configMap = Map(
"task.inputs" -> "kafka.foo,kafka.bar-baz",
"systems.kafka.streams.bar-baz.samza.priority" -> "3")
- val config = new MapConfig(configMap)
+ val config = new DefaultChooserConfig(new MapConfig(configMap))
val priorityStreams = config.getPriorityStreams
assertEquals(1, priorityStreams.size)
assertEquals(3, priorityStreams(new SystemStream("kafka", "bar-baz")))
}
+
+ @Test
+ def testBroadcastOnlyConfig {
+ val configMap = Map(
+ "task.broadcast.inputs" -> "kafka.foo#[1-2],kafka.bar-baz#5,kafka.fizz#0",
+ "systems.kafka.streams.bar-baz.samza.priority" -> "3",
+ "systems.kafka.streams.fizz.samza.bootstrap" -> "true")
+ val config = new DefaultChooserConfig(new MapConfig(configMap))
+ val priorityStreams = config.getPriorityStreams
+ assertEquals(1, priorityStreams.size)
+ assertEquals(3, priorityStreams(new SystemStream("kafka", "bar-baz")))
+
+ val bootstrapStreams = config.getBootstrapStreams
+ assertEquals(1, bootstrapStreams.size())
+ assertTrue(bootstrapStreams.contains(new SystemStream("kafka", "fizz")))
+ }
+
+ @Test
+ def testBroadcastAndStandardInputConfig {
+ val configMap = Map(
+ "task.broadcast.inputs" -> "kafka.foo#[1-2],kafka.bar-baz#5,kafka.fizz#0",
+ "task.inputs" -> "kafka.bootstrapTopic,kafka.priorityTopic,kafka.normalTopic",
+ "systems.kafka.streams.priorityTopic.samza.priority" -> "2",
+ "systems.kafka.streams.bar-baz.samza.priority" -> "3",
+ "systems.kafka.streams.bootstrapTopic.samza.bootstrap" -> "true",
+ "systems.kafka.streams.fizz.samza.bootstrap" -> "true")
+ val config = new DefaultChooserConfig(new MapConfig(configMap))
+ val priorityStreams = config.getPriorityStreams
+ assertEquals(2, priorityStreams.size)
+ assertEquals(2, priorityStreams(new SystemStream("kafka", "priorityTopic")))
+ assertEquals(3, priorityStreams(new SystemStream("kafka", "bar-baz")))
+
+ val bootstrapStreams = config.getBootstrapStreams
+ assertEquals(2, bootstrapStreams.size())
+ assertTrue(bootstrapStreams.contains(new SystemStream("kafka", "bootstrapTopic")))
+ assertTrue(bootstrapStreams.contains(new SystemStream("kafka", "fizz")))
+ }
}
class MockBlockingEnvelopeMap extends BlockingEnvelopeMap {