You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/03/29 20:57:53 UTC

samza git commit: SAMZA-1167: New streamId-specific configs do not override equivalent system-scoped configs

Repository: samza
Updated Branches:
  refs/heads/master b938df623 -> daed7a9a3


SAMZA-1167: New streamId-specific configs do not override equivalent system-scoped configs

Author: Jacob Maes <jm...@linkedin.com>

Reviewers: Xinyu Liu <xi...@linkedin.com>

Closes #101 from jmakes/samza-1167


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/daed7a9a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/daed7a9a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/daed7a9a

Branch: refs/heads/master
Commit: daed7a9a31f30dd5d993407b33b5dd30f4ef7dda
Parents: b938df6
Author: Jacob Maes <jm...@linkedin.com>
Authored: Wed Mar 29 13:57:26 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Wed Mar 29 13:57:26 2017 -0700

----------------------------------------------------------------------
 .../samza/config/DefaultChooserConfig.java      |  10 +-
 .../samza/checkpoint/CheckpointTool.scala       |   2 +-
 .../org/apache/samza/config/StreamConfig.scala  | 146 +++++++++----
 .../apache/samza/config/TestStreamConfig.java   | 215 +++++++++++++++++++
 .../runtime/TestAbstractApplicationRunner.java  |   1 +
 5 files changed, 332 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/daed7a9a/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java b/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java
index 237c6f9..ff344c9 100644
--- a/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/DefaultChooserConfig.java
@@ -30,15 +30,15 @@ import org.apache.samza.system.SystemStream;
  * A convenience class for fetching configs related to the {@link org.apache.samza.system.chooser.DefaultChooser}
  */
 public class DefaultChooserConfig extends MapConfig {
-  public static final String BOOTSTRAP = StreamConfig.STREAM_PREFIX() + "samza.bootstrap";
-  public static final String PRIORITY = StreamConfig.STREAM_PREFIX() + "samza.priority";
-  public static final String BATCH_SIZE = "task.consumer.batch.size";
+  private static final String BATCH_SIZE = "task.consumer.batch.size";
 
   private final TaskConfigJava taskConfigJava;
+  private final StreamConfig streamConfig;
 
   public DefaultChooserConfig(Config config) {
     super(config);
     taskConfigJava = new TaskConfigJava(config);
+    streamConfig = new StreamConfig(config);
   }
 
   /**
@@ -55,7 +55,7 @@ public class DefaultChooserConfig extends MapConfig {
     Set<SystemStream> bootstrapInputs = new HashSet<>();
     Set<SystemStream> allInputs = taskConfigJava.getAllInputStreams();
     for (SystemStream systemStream : allInputs) {
-      if (getBoolean(String.format(BOOTSTRAP, systemStream.getSystem(), systemStream.getStream()), false)) {
+      if (streamConfig.getBootstrapEnabled(systemStream)) {
         bootstrapInputs.add(systemStream);
       }
     }
@@ -75,7 +75,7 @@ public class DefaultChooserConfig extends MapConfig {
 
     Map<SystemStream, Integer> priorityStreams = new HashMap<>();
     for (SystemStream systemStream : allInputs) {
-      int priority = getInt(String.format(PRIORITY, systemStream.getSystem(), systemStream.getStream()), -1);
+      int priority = streamConfig.getPriority(systemStream);
       if (priority >= 0) {
         priorityStreams.put(systemStream, priority);
       }

http://git-wip-us.apache.org/repos/asf/samza/blob/daed7a9a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
index 449b402..9f4aa54 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -66,7 +66,7 @@ import scala.collection.immutable.HashMap
  */
 object CheckpointTool {
   /** Format in which SystemStreamPartition is represented in a properties file */
-  val SSP_PATTERN = "tasknames.%s." + StreamConfig.STREAM_PREFIX + "partitions.%d"
+  val SSP_PATTERN = "tasknames.%s.systems.%s.streams.%s.partitions.%d"
   val SSP_REGEX = Pattern.compile("tasknames\\.(.+)\\.systems\\.(.+)\\.streams\\.(.+)\\.partitions\\.([0-9]+)")
 
   type TaskNameToCheckpointMap = Map[TaskName, Map[SystemStreamPartition, String]]

http://git-wip-us.apache.org/repos/asf/samza/blob/daed7a9a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
index 4cce32f..4f2c688 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
@@ -34,25 +34,27 @@ object StreamConfig {
   val KEY_SERDE =               SAMZA_PROPERTY + "key.serde"
   val CONSUMER_RESET_OFFSET =   SAMZA_PROPERTY + "reset.offset"
   val CONSUMER_OFFSET_DEFAULT = SAMZA_PROPERTY + "offset.default"
+  val BOOTSTRAP =               SAMZA_PROPERTY + "bootstrap"
+  val PRIORITY =                SAMZA_PROPERTY + "priority"
 
-  val STREAMS_PREFIX = "streams."
-  val STREAM_ID_PREFIX = STREAMS_PREFIX + "%s."
-  val SYSTEM_FOR_STREAM_ID = STREAM_ID_PREFIX + SYSTEM
-  val PHYSICAL_NAME_FOR_STREAM_ID = STREAM_ID_PREFIX + PHYSICAL_NAME
-  val SAMZA_STREAM_PROPERTY_PREFIX = STREAM_ID_PREFIX + SAMZA_PROPERTY
+  // We don't want any external dependencies on these patterns while both exist. Use getProperty to ensure proper values.
+  private val STREAMS_PREFIX = "streams."
+  private val STREAM_PREFIX = "systems.%s.streams.%s."
 
-  val STREAM_PREFIX = "systems.%s.streams.%s."
+  protected val STREAM_ID_PREFIX = STREAMS_PREFIX + "%s."
+  protected val SYSTEM_FOR_STREAM_ID = STREAM_ID_PREFIX + SYSTEM
+  protected val PHYSICAL_NAME_FOR_STREAM_ID = STREAM_ID_PREFIX + PHYSICAL_NAME
 
   implicit def Config2Stream(config: Config) = new StreamConfig(config)
 }
 
 class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
-  def getStreamMsgSerde(systemStream: SystemStream) = nonEmptyOption(getProperty(systemStream, StreamConfig.MSG_SERDE))
+  def getStreamMsgSerde(systemStream: SystemStream) = nonEmptyOption(getSamzaProperty(systemStream, StreamConfig.MSG_SERDE))
 
-  def getStreamKeySerde(systemStream: SystemStream) = nonEmptyOption(getProperty(systemStream, StreamConfig.KEY_SERDE))
+  def getStreamKeySerde(systemStream: SystemStream) = nonEmptyOption(getSamzaProperty(systemStream, StreamConfig.KEY_SERDE))
 
   def getResetOffset(systemStream: SystemStream) =
-    Option(getProperty(systemStream, StreamConfig.CONSUMER_RESET_OFFSET)) match {
+    Option(getSamzaProperty(systemStream, StreamConfig.CONSUMER_RESET_OFFSET)) match {
       case Some("true") => true
       case Some("false") => false
       case Some(resetOffset) =>
@@ -62,8 +64,18 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
       case _ => false
     }
 
+  def isResetOffsetConfigured(systemStream: SystemStream) = containsSamzaProperty(systemStream, StreamConfig.CONSUMER_RESET_OFFSET)
+
   def getDefaultStreamOffset(systemStream: SystemStream) =
-    Option(getProperty(systemStream, StreamConfig.CONSUMER_OFFSET_DEFAULT))
+    Option(getSamzaProperty(systemStream, StreamConfig.CONSUMER_OFFSET_DEFAULT))
+
+  def isDefaultStreamOffsetConfigured(systemStream: SystemStream) = containsSamzaProperty(systemStream, StreamConfig.CONSUMER_OFFSET_DEFAULT)
+
+  def getBootstrapEnabled(systemStream: SystemStream) =
+    java.lang.Boolean.parseBoolean(getSamzaProperty(systemStream, StreamConfig.BOOTSTRAP))
+
+  def getPriority(systemStream: SystemStream) =
+    java.lang.Integer.parseInt(getSamzaProperty(systemStream, StreamConfig.PRIORITY, "-1"))
 
   /**
    * Returns a list of all SystemStreams that have a serde defined from the config file.
@@ -81,30 +93,14 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
     val systemStreams = subset(StreamConfig.STREAMS_PREFIX)
       .keys
       .filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE))
-      .map(k => {
-        val streamId = k.substring(0, k.length - 16 /* .samza.XXX.serde length */ )
-        streamIdToSystemStream(streamId)
-      }).toSet
+      .map(k => k.substring(0, k.length - 16 /* .samza.XXX.serde length */ ))
+      .filter(streamId => systemName.equals(getSystem(streamId)))
+      .map(streamId => streamIdToSystemStream(streamId)).toSet
 
     legacySystemStreams.union(systemStreams)
   }
 
   /**
-    * Gets the stream properties from the legacy config style:
-    * systems.{system}.streams.{streams}.*
-    *
-    * @param systemName the system name under which the properties are configured
-    * @param streamName the stream name
-    * @return           the map of properties for the stream
-    */
-  private def getSystemStreamProperties(systemName: String, streamName: String) = {
-    if (systemName == null || streamName == null) {
-      Map()
-    }
-    config.subset(StreamConfig.STREAM_PREFIX format(systemName, streamName), true)
-  }
-
-  /**
     * Gets the properties for the specified streamId from the config.
     * It first applies any legacy configs from this config location:
     * systems.{system}.streams.{stream}.*
@@ -112,15 +108,16 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
     * It then overrides them with properties of the new config format:
     * streams.{streamId}.*
     *
+    * Only returns properties of the stream itself, not any of the samza properties for the stream.
+    *
     * @param streamId the identifier for the stream in the config.
     * @return         the merged map of config properties from both the legacy and new config styles
     */
   def getStreamProperties(streamId: String) = {
-    val allProperties = subset(StreamConfig.STREAM_ID_PREFIX format streamId)
+    val allProperties = getAllStreamProperties(streamId)
     val samzaProperties = allProperties.subset(StreamConfig.SAMZA_PROPERTY, false)
     val filteredStreamProperties:java.util.Map[String, String] = allProperties.filterKeys(k => !samzaProperties.containsKey(k))
-    val inheritedLegacyProperties:java.util.Map[String, String] = getSystemStreamProperties(getSystem(streamId), getPhysicalName(streamId))
-    new MapConfig(java.util.Arrays.asList(inheritedLegacyProperties, filteredStreamProperties))
+    new MapConfig(filteredStreamProperties)
   }
 
   /**
@@ -153,14 +150,21 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
   }
 
   /**
-    * Gets the specified property for a SystemStream.
+    * Gets the specified Samza property for a SystemStream. A Samza property is a property that controls how Samza
+    * interacts with the stream, as opposed to a property of the stream itself.
     *
     * Note, because the translation is not perfect between SystemStream and streamId,
     * this method is not identical to getProperty(streamId, property)
+    *
+    * @param systemStream the SystemStream for which the property value will be retrieved.
+    * @param property the samza property name excluding the leading delimiter. e.g. "samza.x.y"
     */
-  private def getProperty(systemStream: SystemStream, property: String): String = {
-    val streamVal = getStreamProperties(systemStreamToStreamId(systemStream)).get(property)
+  protected def getSamzaProperty(systemStream: SystemStream, property: String): String = {
+    if (!property.startsWith(StreamConfig.SAMZA_PROPERTY)) {
+      throw new IllegalArgumentException("Attempt to fetch a non samza property for SystemStream %s named %s" format(systemStream, property))
+    }
 
+    val streamVal = getAllStreamProperties(systemStreamToStreamId(systemStream)).get(property)
     if (streamVal != null) {
       streamVal
     } else {
@@ -168,8 +172,78 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
     }
   }
 
+  /**
+    * Gets the specified Samza property for a SystemStream. A Samza property is a property that controls how Samza
+    * interacts with the stream, as opposed to a property of the stream itself.
+    *
+    * Note, because the translation is not perfect between SystemStream and streamId,
+    * this method is not identical to getProperty(streamId, property)
+    *
+    * @param systemStream the SystemStream for which the property value will be retrieved.
+    * @param property the samza property name excluding the leading delimiter. e.g. "samza.x.y"
+    * @param defaultValue the default value to use if the property value is not found
+    *
+    */
+  protected def getSamzaProperty(systemStream: SystemStream, property: String, defaultValue: String): String = {
+    val streamVal = getSamzaProperty(systemStream, property)
+
+    if (streamVal != null) {
+      streamVal
+    } else {
+      defaultValue
+    }
+  }
+
+  /**
+    * Gets the specified Samza property for a SystemStream. A Samza property is a property that controls how Samza
+    * interacts with the stream, as opposed to a property of the stream itself.
+    *
+    * Note, because the translation is not perfect between SystemStream and streamId,
+    * this method is not identical to getProperty(streamId, property)
+    */
+  protected def containsSamzaProperty(systemStream: SystemStream, property: String): Boolean = {
+    if (!property.startsWith(StreamConfig.SAMZA_PROPERTY)) {
+      throw new IllegalArgumentException("Attempt to fetch a non samza property for SystemStream %s named %s" format(systemStream, property))
+    }
+    return getSamzaProperty(systemStream, property) != null
+  }
+
+
+  /**
+    * Gets the stream properties from the legacy config style:
+    * systems.{system}.streams.{streams}.*
+    *
+    * @param systemName the system name under which the properties are configured
+    * @param streamName the stream name
+    * @return           the map of properties for the stream
+    */
+  private def getSystemStreamProperties(systemName: String, streamName: String) = {
+    if (systemName == null || streamName == null) {
+      Map()
+    }
+    config.subset(StreamConfig.STREAM_PREFIX format(systemName, streamName), true)
+  }
+
+  /**
+    * Gets the properties for the specified streamId from the config.
+    * It first applies any legacy configs from this config location:
+    * systems.{system}.streams.{stream}.*
+    *
+    * It then overrides them with properties of the new config format:
+    * streams.{streamId}.*
+    *
+    * @param streamId the identifier for the stream in the config.
+    * @return         the merged map of config properties from both the legacy and new config styles
+    */
+  private def getAllStreamProperties(streamId: String) = {
+    val allProperties = subset(StreamConfig.STREAM_ID_PREFIX format streamId)
+    val inheritedLegacyProperties:java.util.Map[String, String] = getSystemStreamProperties(getSystem(streamId), getPhysicalName(streamId))
+    new MapConfig(java.util.Arrays.asList(inheritedLegacyProperties, allProperties))
+  }
+
   private def getStreamIds(): Iterable[String] = {
-    subset(StreamConfig.STREAMS_PREFIX).keys
+    // StreamIds are not allowed to have '.' so the first index of '.' marks the end of the streamId.
+    subset(StreamConfig.STREAMS_PREFIX).keys.map(key => key.substring(0, key.indexOf(".")))
   }
 
   private def getStreamIdsForSystem(system: String): Iterable[String] = {

http://git-wip-us.apache.org/repos/asf/samza/blob/daed7a9a/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java
new file mode 100644
index 0000000..639be3d
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/config/TestStreamConfig.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.config;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.system.SystemStream;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestStreamConfig {
+  private static final String SYSTEM_STREAM_PATTERN = "systems.%s.streams.%s.";
+  private static final String STREAM_ID_PATTERN = "streams.%s.";
+
+  private static final String STREAM1_SYSTEM = "Sys1";
+  private static final String STREAM1_PHYSICAL_NAME = "Str1";
+  private static final String STREAM1_STREAM_ID = "streamId1";
+  private static final SystemStream SYSTEM_STREAM_1 = new SystemStream(STREAM1_SYSTEM, STREAM1_PHYSICAL_NAME);
+
+  private static final String STREAM2_SYSTEM = "Sys2";
+  private static final String STREAM2_PHYSICAL_NAME = "Str2";
+  private static final String STREAM2_STREAM_ID = "streamId2";
+  private static final SystemStream SYSTEM_STREAM_2 = new SystemStream(STREAM2_SYSTEM, STREAM2_PHYSICAL_NAME);
+
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetSamzaPropertyThrowsIfInvalidPropertyName() {
+    StreamConfig config = buildConfig("key1", "value1", "key2", "value2");
+    config.getSamzaProperty(SYSTEM_STREAM_1, "key1");
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetSamzaPropertyWithDefaultThrowsIfInvalidPropertyName() {
+    StreamConfig config = buildConfig("key1", "value1", "key2", "value2");
+    config.getSamzaProperty(SYSTEM_STREAM_1, "key1", "default");
+  }
+
+  // 00
+  @Test
+  public void testGetSamzaPropertyDoesNotExist() {
+    StreamConfig config = buildConfig(buildProp(SYSTEM_STREAM_1, "samza.key1"), "value1", "key2", "value2");
+    assertNull(config.getSamzaProperty(SYSTEM_STREAM_1, "samza.keyNonExistent"));
+  }
+
+  // 01
+  @Test
+  public void testGetSamzaPropertyFromSystemStream() {
+    StreamConfig config = buildConfig(buildProp(SYSTEM_STREAM_1, "samza.key1"), "value1", "key2", "value2");
+    assertEquals("value1", config.getSamzaProperty(SYSTEM_STREAM_1, "samza.key1"));
+  }
+
+  // 10
+  @Test
+  public void testGetSamzaPropertyFromStreamId() {
+    StreamConfig config = buildConfig("key1", "value1",
+        buildProp(STREAM1_STREAM_ID, "samza.key2"), "value2",
+        buildProp(STREAM1_STREAM_ID, StreamConfig.SYSTEM()), STREAM1_SYSTEM,
+        buildProp(STREAM1_STREAM_ID, StreamConfig.PHYSICAL_NAME()), STREAM1_PHYSICAL_NAME);
+    assertEquals("value2", config.getSamzaProperty(SYSTEM_STREAM_1, "samza.key2"));
+  }
+
+  // 11
+  @Test
+  public void testGetSamzaPropertyFromSystemStreamAndStreamId() {
+    StreamConfig config = buildConfig("key1", "value1",
+        buildProp(SYSTEM_STREAM_1, "samza.key2"), "value2",
+        buildProp(STREAM1_STREAM_ID, "samza.key2"), "value2OVERRIDE",
+        buildProp(STREAM1_STREAM_ID, StreamConfig.SYSTEM()), STREAM1_SYSTEM,
+        buildProp(STREAM1_STREAM_ID, StreamConfig.PHYSICAL_NAME()), STREAM1_PHYSICAL_NAME);
+    assertEquals("value2OVERRIDE", config.getSamzaProperty(SYSTEM_STREAM_1, "samza.key2"));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testContainsSamzaPropertyThrowsIfInvalidPropertyName() {
+    StreamConfig config = buildConfig("key1", "value1", "key2", "value2");
+    config.containsSamzaProperty(new SystemStream("SysX", "StrX"), "key1");
+  }
+
+  // 00
+  @Test
+  public void testContainsSamzaPropertyDoesNotExist() {
+    StreamConfig config = buildConfig(buildProp(SYSTEM_STREAM_1, "samza.key1"), "value1", "key2", "value2");
+    assertFalse(config.containsSamzaProperty(SYSTEM_STREAM_1, "samza.keyNonExistent"));
+  }
+
+  // 01
+  @Test
+  public void testContainsSamzaPropertyFromSystemStream() {
+    StreamConfig config = buildConfig(buildProp(SYSTEM_STREAM_1, "samza.key1"), "value1", "key2", "value2");
+    assertTrue(config.containsSamzaProperty(SYSTEM_STREAM_1, "samza.key1"));
+  }
+
+  // 10
+  @Test
+  public void testContainsSamzaPropertyFromStreamId() {
+    StreamConfig config = buildConfig("key1", "value1",
+        buildProp(STREAM1_STREAM_ID, "samza.key2"), "value2",
+        buildProp(STREAM1_STREAM_ID, StreamConfig.SYSTEM()), STREAM1_SYSTEM,
+        buildProp(STREAM1_STREAM_ID, StreamConfig.PHYSICAL_NAME()), STREAM1_PHYSICAL_NAME);
+    assertTrue(config.containsSamzaProperty(SYSTEM_STREAM_1, "samza.key2"));
+  }
+
+  // 11
+  @Test
+  public void testContainsSamzaPropertyFromSystemStreamAndStreamId() {
+    StreamConfig config = buildConfig("key1", "value1",
+        buildProp(SYSTEM_STREAM_1, "samza.key2"), "value2",
+        buildProp(STREAM1_STREAM_ID, "samza.key2"), "value2OVERRIDE",
+        buildProp(STREAM1_STREAM_ID, StreamConfig.SYSTEM()), STREAM1_SYSTEM,
+        buildProp(STREAM1_STREAM_ID, StreamConfig.PHYSICAL_NAME()), STREAM1_PHYSICAL_NAME);
+    assertTrue(config.containsSamzaProperty(SYSTEM_STREAM_1, "samza.key2"));
+  }
+
+  // 00
+  @Test
+  public void testGetSerdeStreamsDoesNotExist() {
+    StreamConfig config = buildConfig(buildProp(SYSTEM_STREAM_1, "samza.key1"), "value1", "key2", "value2");
+    assertTrue(config.getSerdeStreams(STREAM1_SYSTEM).isEmpty());
+  }
+
+  // 01
+  @Test
+  public void testGetSerdeStreamsFromSystemStream() {
+    StreamConfig config = buildConfig(buildProp(SYSTEM_STREAM_1, StreamConfig.KEY_SERDE()), "value1",
+        buildProp(SYSTEM_STREAM_2, StreamConfig.MSG_SERDE()), "value2",
+        "key3", "value3");
+    assertEquals(1, config.getSerdeStreams(STREAM1_SYSTEM).size());
+    assertEquals(1, config.getSerdeStreams(STREAM2_SYSTEM).size());
+    assertEquals("value1", config.getStreamKeySerde(SYSTEM_STREAM_1).get());
+    assertEquals("value2", config.getStreamMsgSerde(SYSTEM_STREAM_2).get());
+  }
+
+  // 10
+  @Test
+  public void testGetSerdeStreamsFromStreamId() {
+    StreamConfig config = buildConfig(
+        buildProp(STREAM1_STREAM_ID, StreamConfig.SYSTEM()), STREAM1_SYSTEM,
+        buildProp(STREAM1_STREAM_ID, StreamConfig.PHYSICAL_NAME()), STREAM1_PHYSICAL_NAME,
+        buildProp(STREAM1_STREAM_ID, StreamConfig.KEY_SERDE()), "value1",
+
+        buildProp(STREAM2_STREAM_ID, StreamConfig.SYSTEM()), STREAM2_SYSTEM,
+        buildProp(STREAM2_STREAM_ID, StreamConfig.PHYSICAL_NAME()), STREAM2_PHYSICAL_NAME,
+        buildProp(STREAM2_STREAM_ID, StreamConfig.MSG_SERDE()), "value2",
+        "key3", "value3");
+    assertEquals(1, config.getSerdeStreams(STREAM1_SYSTEM).size());
+    assertEquals(1, config.getSerdeStreams(STREAM2_SYSTEM).size());
+    assertEquals("value1", config.getStreamKeySerde(SYSTEM_STREAM_1).get());
+    assertEquals("value2", config.getStreamMsgSerde(SYSTEM_STREAM_2).get());
+  }
+
+  // 11
+  @Test
+  public void testGetSerdeStreamsFromSystemStreamAndStreamId() {
+    StreamConfig config = buildConfig(buildProp(SYSTEM_STREAM_1, StreamConfig.KEY_SERDE()), "value1",
+        buildProp(STREAM1_STREAM_ID, StreamConfig.SYSTEM()), STREAM1_SYSTEM,
+        buildProp(STREAM1_STREAM_ID, StreamConfig.PHYSICAL_NAME()), STREAM1_PHYSICAL_NAME,
+        buildProp(STREAM1_STREAM_ID, StreamConfig.KEY_SERDE()), "value1OVERRIDE",
+        buildProp(STREAM2_STREAM_ID, StreamConfig.SYSTEM()), STREAM2_SYSTEM,
+        buildProp(STREAM2_STREAM_ID, StreamConfig.PHYSICAL_NAME()), STREAM2_PHYSICAL_NAME,
+        buildProp(STREAM2_STREAM_ID, StreamConfig.MSG_SERDE()), "value2",
+        "key3", "value3");
+
+    assertEquals(1, config.getSerdeStreams(STREAM1_SYSTEM).size());
+    assertEquals(1, config.getSerdeStreams(STREAM2_SYSTEM).size());
+    assertEquals("value1OVERRIDE", config.getStreamKeySerde(SYSTEM_STREAM_1).get());
+    assertEquals("value2", config.getStreamMsgSerde(SYSTEM_STREAM_2).get());
+  }
+
+  private StreamConfig buildConfig(String... kvs) {
+    if (kvs.length % 2 != 0) {
+      throw new IllegalArgumentException("There must be parity between the keys and values");
+    }
+
+    Map<String, String> configMap = new HashMap<>();
+    for (int i = 0; i < kvs.length - 1; i += 2) {
+      configMap.put(kvs[i], kvs[i + 1]);
+    }
+    return new StreamConfig(new MapConfig(configMap));
+  }
+
+  private String buildProp(String streamId, String suffix) {
+    return String.format(STREAM_ID_PATTERN, streamId) + suffix;
+  }
+
+  private String buildProp(SystemStream systemStream, String suffix) {
+    return String.format(SYSTEM_STREAM_PATTERN, systemStream.getSystem(), systemStream.getStream()) + suffix;
+  }
+
+  private Config addConfigs(Config original, String... kvs) {
+    Map<String, String> result = new HashMap<>();
+    result.putAll(original);
+    result.putAll(buildConfig(kvs));
+    return new MapConfig(result);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/daed7a9a/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java
index 8d7db9f..eeb783c 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java
@@ -30,6 +30,7 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+
 public class TestAbstractApplicationRunner {
   private static final String STREAM_ID = "t3st-Stream_Id";
   private static final String STREAM_ID_INVALID = "test#Str3amId!";