You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/03/07 00:18:57 UTC

[8/9] samza git commit: SAMZA-1096: StreamSpec constructors in the ExecutionEnvironments

SAMZA-1096: StreamSpec constructors in the ExecutionEnvironments

Author: Jacob Maes <jm...@linkedin.com>

Reviewers: Yi Pan (Data Infrastructure) <ni...@gmail.com>,Xinyu Liu <xi...@linkedin.com>,Navina Ramesh <na...@apache.org>

Closes #74 from jmakes/samza-1096


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e6c1eed4
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e6c1eed4
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e6c1eed4

Branch: refs/heads/samza-fluent-api-v1
Commit: e6c1eed4f1d576661abafce8477c1749c2554b39
Parents: d104013
Author: Jacob Maes <jm...@linkedin.com>
Authored: Mon Mar 6 14:52:18 2017 -0800
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Mon Mar 6 14:52:18 2017 -0800

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html     | 225 +++++++++----
 .../documentation/versioned/jobs/logging.md     |   5 +
 .../samza/system/ExecutionEnvironment.java      |  32 +-
 .../org/apache/samza/system/StreamSpec.java     |  15 +-
 .../system/AbstractExecutionEnvironment.java    |  88 +++++
 .../system/RemoteExecutionEnvironment.java      |   6 +-
 .../system/StandaloneExecutionEnvironment.java  |   6 +-
 .../org/apache/samza/config/JobConfig.scala     |   3 +
 .../org/apache/samza/config/StreamConfig.scala  | 179 ++++++++--
 .../samza/example/TestBroadcastExample.java     |   9 +-
 .../apache/samza/example/TestJoinExample.java   |   2 +-
 .../apache/samza/example/TestWindowExample.java |   2 +-
 .../TestAbstractExecutionEnvironment.java       | 331 +++++++++++++++++++
 .../org/apache/samza/config/KafkaConfig.scala   |   8 +-
 .../apache/samza/config/Log4jSystemConfig.java  |  12 +-
 15 files changed, 810 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index a26bc43..ba04139 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -156,13 +156,20 @@
                         others' checkpoints, and perhaps interfere with each other in other ways.
                     </td>
                 </tr>
-                <tr>
                     <td class="property" id="job-coordinator-system">job.coordinator.system</td>
                     <td class="default"></td>
                     <td class="description">
                         <strong>Required:</strong> The <span class="system">system-name</span> to use for creating and maintaining the <a href="../container/coordinator-stream.html">Coordinator Stream</a>.
                     </td>
                 </tr>
+                <tr>
+                    <td class="property" id="job-default-system">job.default.system</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        The <span class="system">system-name</span> to access any input or output streams for which the system is not explicitly configured.
+                        This property is for input and output streams whereas job.coordinator.system is for samza metadata streams.</a>.
+                    </td>
+                </tr>
 
                 <tr>
                     <td class="property" id="job-coordinator-replication-factor">job.coordinator.<br />replication.factor</td>
@@ -254,7 +261,7 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="job-systemstreampartition-matcher-class">job.systemstreampartition.matcher.class</td>
+                    <td class="property" id="job-systemstreampartition-matcher-class">job.systemstreampartition.<br>matcher.class</td>
                     <td class="default"></td>
                     <td class="description">
                         If you want to enable static partition assignment, then this is a <strong>required</strong> configuration.
@@ -286,7 +293,7 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="job_systemstreampartition_matcher_config_range">job.systemstreampartition.matcher.config.range</td>
+                    <td class="property" id="job_systemstreampartition_matcher_config_range">job.systemstreampartition.<br>matcher.config.range</td>
                     <td class="default"></td>
                     <td class="description">
                         If <code>job.systemstreampartition.matcher.class</code> is specified, and the value of this property is
@@ -305,7 +312,7 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="job_systemstreampartition_matcher_config_regex">job.systemstreampartition.matcher.config.regex</td>
+                    <td class="property" id="job_systemstreampartition_matcher_config_regex">job.systemstreampartition.<br>matcher.config.regex</td>
                     <td class="default"></td>
                     <td class="description">
                         If <code>job.systemstreampartition.matcher.class</code> is specified, and the value of this property is
@@ -316,8 +323,7 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="job_systemstreampartition_matcher_co
-                    nfig_job_factory_regex">job.systemstreampartition.matcher.config.job.factory.regex</td>
+                    <td class="property" id="job_systemstreampartition_matcher_config_job_factory_regex">job.systemstreampartition.<br>matcher.config.job.factory.regex</td>
                     <td class="default"></td>
                     <td class="description">
                         This configuration can be used to specify the Java supported regex to match the <code>StreamJobFactory</code>
@@ -683,7 +689,7 @@
                 </tr>
 
                 <tr>
-                    <th colspan="3" class="section" id="streams"><a href="../container/streams.html">Systems (input and output streams)</a></th>
+                    <th colspan="3" class="section" id="systems">Systems</th>
                 </tr>
 
                 <tr>
@@ -716,11 +722,12 @@
                 <tr>
                     <td class="property" id="systems-samza-key-serde">systems.<span class="system">system-name</span>.<br>samza.key.serde</td>
                     <td class="default" rowspan="2"></td>
-                    <td class="description" rowspan="2">
+                    <td class="description">
                         The <a href="../container/serialization.html">serde</a> which will be used to deserialize the
                         <em>key</em> of messages on input streams, and to serialize the <em>key</em> of messages on
-                        output streams. This property can be defined either for an individual stream, or for all
-                        streams within a system (if both are defined, the stream-level definition takes precedence).
+                        output streams. This property defines the serde for an for all streams in the system. See the
+                        <a href="#streams-samza-key-serde">stream-scoped property</a> to define the serde for an
+                        individual stream. If both are defined, the stream-level definition takes precedence.
                         The value of this property must be a <span class="serde">serde-name</span> that is registered
                         with <a href="#serializers-registry-class" class="property">serializers.registry.*.class</a>.
                         If this property is not set, messages are passed unmodified between the input stream consumer,
@@ -729,16 +736,21 @@
                 </tr>
                 <tr>
                     <td class="property">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.key.serde</td>
+                    <td class="description">
+                        This is deprecated in favor of <a href="#streams-samza-key-serde" class="property">
+                        streams.<span class="stream">stream-id</span>.samza.key.serde</a>.
+                    </td>
                 </tr>
 
                 <tr>
                     <td class="property" id="systems-samza-msg-serde">systems.<span class="system">system-name</span>.<br>samza.msg.serde</td>
                     <td class="default" rowspan="2"></td>
-                    <td class="description" rowspan="2">
+                    <td class="description">
                         The <a href="../container/serialization.html">serde</a> which will be used to deserialize the
                         <em>value</em> of messages on input streams, and to serialize the <em>value</em> of messages on
-                        output streams. This property can be defined either for an individual stream, or for all
-                        streams within a system (if both are defined, the stream-level definition takes precedence).
+                        output streams. This property defines the serde for an for all streams in the system. See the
+                        <a href="#streams-samza-msg-serde">stream-scoped property</a> to define the serde for an
+                        individual stream. If both are defined, the stream-level definition takes precedence.
                         The value of this property must be a <span class="serde">serde-name</span> that is registered
                         with <a href="#serializers-registry-class" class="property">serializers.registry.*.class</a>.
                         If this property is not set, messages are passed unmodified between the input stream consumer,
@@ -747,12 +759,16 @@
                 </tr>
                 <tr>
                     <td class="property">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.msg.serde</td>
+                    <td class="description">
+                        This is deprecated in favor of <a href="#streams-samza-msg-serde" class="property">
+                        streams.<span class="stream">stream-id</span>.samza.msg.serde</a>.
+                    </td>
                 </tr>
 
                 <tr>
                     <td class="property" id="systems-samza-offset-default">systems.<span class="system">system-name</span>.<br>samza.offset.default</td>
                     <td class="default" rowspan="2">upcoming</td>
-                    <td class="description" rowspan="2">
+                    <td class="description">
                         If a container starts up without a <a href="../container/checkpointing.html">checkpoint</a>,
                         this property determines where in the input stream we should start consuming. The value must be an
                         <a href="../api/javadocs/org/apache/samza/system/SystemStreamMetadata.OffsetType.html">OffsetType</a>,
@@ -765,18 +781,113 @@
                             <dd>Start processing at the oldest available message in the system, and
                                 <a href="reprocessing.html">reprocess</a> the entire available message history.</dd>
                         </dl>
-                        This property can be defined either for an individual stream, or for all streams within a system
-                        (if both are defined, the stream-level definition takes precedence).
+                        This property is for all streams within a system. To set it for an individual stream, see
+                        <a href="#streams-samza-offset-default">streams.<span class="stream">stream-id</span>.<br>samza.offset.default</a>
+                        If both are defined, the stream-level definition takes precedence.
                     </td>
                 </tr>
                 <tr>
                     <td class="property">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.offset.default</td>
+                    <td class="description">
+                        This is deprecated in favor of <a href="#streams-samza-offset-default" class="property">
+                        streams.<span class="stream">stream-id</span>.samza.offset.default</a>.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="task-consumer-batch-size">task.consumer.batch.size</td>
+                    <td class="default">1</td>
+                    <td class="description">
+                        If set to a positive integer, the task will try to consume
+                        <a href="../container/streams.html#batching">batches</a> with the given number of messages
+                        from each input stream, rather than consuming round-robin from all the input streams on
+                        each individual message. Setting this property can improve performance in some cases.
+                    </td>
+                </tr>
+
+                <tr>
+                    <th colspan="3" class="section" id="streams"><a href="../container/streams.html">Streams</a></th>
+                </tr>
+
+                <tr>
+                    <td class="property" id="streams-system">streams.<span class="stream">stream-id</span>.<br>samza.system</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        The <span class="system">system-name</span> of the system on which this stream will be accessed.
+                        This property binds the stream to one of the systems defined with the property
+                        systems.<span class="system">system-name</span>.samza.factory. <br>
+                        If this property isn't specified, it is inherited from job.default.system.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="streams-physical-name">streams.<span class="stream">stream-id</span>.<br>samza.physical.name</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        The physical name of the stream on the system on which this stream will be accessed.
+                        This is opposed to the stream-id which is the logical name that Samza uses to identify the stream.
+                        A physical name could be a Kafka topic name, an HDFS file URN or any other system-specific identifier.
+                    </td>
                 </tr>
 
                 <tr>
-                    <td class="property" id="systems-streams-samza-reset-offset">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.reset.offset</td>
-                    <td>false</td>
-                    <td>
+                    <td class="property" id="streams-samza-key-serde">streams.<span class="stream">stream-id</span>.<br>samza.key.serde</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        The <a href="../container/serialization.html">serde</a> which will be used to deserialize the
+                        <em>key</em> of messages on input streams, and to serialize the <em>key</em> of messages on
+                        output streams. This property defines the serde for an individual stream. See the
+                        <a href="#systems-samza-key-serde">system-scoped property</a> to define the serde for all
+                        streams within a system. If both are defined, the stream-level definition takes precedence.
+                        The value of this property must be a <span class="serde">serde-name</span> that is registered
+                        with <a href="#serializers-registry-class" class="property">serializers.registry.*.class</a>.
+                        If this property is not set, messages are passed unmodified between the input stream consumer,
+                        the task and the output stream producer.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="streams-samza-msg-serde">streams.<span class="stream">stream-id</span>.<br>samza.msg.serde</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        The <a href="../container/serialization.html">serde</a> which will be used to deserialize the
+                        <em>value</em> of messages on input streams, and to serialize the <em>value</em> of messages on
+                        output streams. This property defines the serde for an individual stream. See the
+                        <a href="#systems-samza-msg-serde">system-scoped property</a> to define the serde for all
+                        streams within a system. If both are defined, the stream-level definition takes precedence.
+                        The value of this property must be a <span class="serde">serde-name</span> that is registered
+                        with <a href="#serializers-registry-class" class="property">serializers.registry.*.class</a>.
+                        If this property is not set, messages are passed unmodified between the input stream consumer,
+                        the task and the output stream producer.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="streams-samza-offset-default">streams.<span class="stream">stream-id</span>.<br>samza.offset.default</td>
+                    <td class="default">upcoming</td>
+                    <td class="description">
+                        If a container starts up without a <a href="../container/checkpointing.html">checkpoint</a>,
+                        this property determines where in the input stream we should start consuming. The value must be an
+                        <a href="../api/javadocs/org/apache/samza/system/SystemStreamMetadata.OffsetType.html">OffsetType</a>,
+                        one of the following:
+                        <dl>
+                            <dt><code>upcoming</code></dt>
+                            <dd>Start processing messages that are published after the job starts. Any messages published while
+                                the job was not running are not processed.</dd>
+                            <dt><code>oldest</code></dt>
+                            <dd>Start processing at the oldest available message in the system, and
+                                <a href="reprocessing.html">reprocess</a> the entire available message history.</dd>
+                        </dl>
+                        This property is for an individual stream. To set it for all streams within a system, see
+                        <a href="#systems-samza-offset-default">systems.<span class="system">system-name</span>.<br>samza.offset.default</a>
+                        If both are defined, the stream-level definition takes precedence.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="streams-streams-samza-reset-offset">streams.<span class="stream">stream-id</span>.<br>samza.reset.offset</td>
+                    <td class="default">false</td>
+                    <td class="description">
                         If set to <code>true</code>, when a Samza container starts up, it ignores any
                         <a href="../container/checkpointing.html">checkpointed offset</a> for this particular input
                         stream. Its behavior is thus determined by the <code>samza.offset.default</code> setting.
@@ -787,9 +898,9 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="systems-streams-samza-priority">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.priority</td>
-                    <td>-1</td>
-                    <td>
+                    <td class="property" id="streams-streams-samza-priority">streams.<span class="stream">stream-id</span>.<br>samza.priority</td>
+                    <td class="default">-1</td>
+                    <td class="description">
                         If one or more streams have a priority set (any positive integer), they will be processed
                         with <a href="../container/streams.html#prioritizing-input-streams">higher priority</a> than the other streams.
                         You can set several streams to the same priority, or define multiple priority levels by
@@ -800,9 +911,9 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="systems-streams-samza-bootstrap">systems.<span class="system">system-name</span>.<br>streams.<span class="stream">stream-name</span>.<br>samza.bootstrap</td>
-                    <td>false</td>
-                    <td>
+                    <td class="property" id="streams-streams-samza-bootstrap">streams.<span class="stream">stream-id</span>.<br>samza.bootstrap</td>
+                    <td class="default">false</td>
+                    <td class="description">
                         If set to <code>true</code>, this stream will be processed as a
                         <a href="../container/streams.html#bootstrapping">bootstrap stream</a>. This means that every time
                         a Samza container starts up, this stream will be fully consumed before messages from any
@@ -811,13 +922,12 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="task-consumer-batch-size">task.consumer.batch.size</td>
-                    <td>1</td>
-                    <td>
-                        If set to a positive integer, the task will try to consume
-                        <a href="../container/streams.html#batching">batches</a> with the given number of messages
-                        from each input stream, rather than consuming round-robin from all the input streams on
-                        each individual message. Setting this property can improve performance in some cases.
+                    <td class="property" id="streams-properties">streams.<span class="stream">stream-id</span>.*</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        Any properties of the stream. These are typically system-specific and can be used by the system
+                        for stream creation or validation. Note that the other properties are prefixed with <em>samza.</em>
+                        which distinguishes them as Samza properties that are not system-specific.
                     </td>
                 </tr>
 
@@ -835,6 +945,8 @@
                         <span class="serde">serde-name</span> you want, and reference that name in properties like
                         <a href="#systems-samza-key-serde" class="property">systems.*.samza.key.serde</a>,
                         <a href="#systems-samza-msg-serde" class="property">systems.*.samza.msg.serde</a>,
+                        <a href="#streams-samza-key-serde" class="property">streams.*.samza.key.serde</a>,
+                        <a href="#streams-samza-msg-serde" class="property">streams.*.samza.msg.serde</a>,
                         <a href="#stores-key-serde" class="property">stores.*.key.serde</a> and
                         <a href="#stores-msg-serde" class="property">stores.*.msg.serde</a>.
                         The value of this property is the fully-qualified name of a Java class that implements
@@ -1445,7 +1557,7 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="cluster-manager-container-memory-mb">cluster-manager.container.memory.mb</td>
+                    <td class="property" id="cluster-manager-container-memory-mb">cluster-manager.container.<br>memory.mb</td>
                     <td class="default">1024</td>
                     <td class="description">
                         How much memory, in megabytes, to request from the cluster manager per container of your job. Along with
@@ -1461,7 +1573,7 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="cluster-manager-container-cpu-cores">cluster-manager.container.cpu.cores</td>
+                    <td class="property" id="cluster-manager-container-cpu-cores">cluster-manager.container.<br>cpu.cores</td>
                     <td class="default">1</td>
                     <td class="description">
                         The number of CPU cores to request per container of your job. Each node in the
@@ -1501,7 +1613,7 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="cluster-manager-jmx-enabled">cluster-manager.jobcoordinator.jmx.enabled</td>
+                    <td class="property" id="cluster-manager-jmx-enabled">cluster-manager.jobcoordinator.<br>jmx.enabled</td>
                     <td class="default">true</td>
                     <td class="description">
                         Determines whether a JMX server should be started on the job's JobCoordinator.
@@ -1510,7 +1622,7 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="cluster-manager-allocator-sleep-ms">cluster-manager.allocator.sleep.ms</td>
+                    <td class="property" id="cluster-manager-allocator-sleep-ms">cluster-manager.allocator.<br>sleep.ms</td>
                     <td class="default">3600</td>
                     <td class="description">
                         The container allocator thread is responsible for matching requests to allocated containers.
@@ -1519,7 +1631,7 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="cluster-manager-container-request-timeout-ms">cluster-manager.container.request.timeout.ms</td>
+                    <td class="property" id="cluster-manager-container-request-timeout-ms">cluster-manager.container.<br>request.timeout.ms</td>
                     <td class="default">5000</td>
                     <td class="description">
                         The allocator thread periodically checks the state of the container requests and allocated containers to determine the assignment of a container to an allocated resource.
@@ -1758,10 +1870,9 @@
                         <a href="../container/metrics.html">JSON encoding</a> for metrics; in order to use this
                         encoding, you also need to configure a serde for the metrics stream:
                         <ul>
-                            <li><a href="#systems-samza-msg-serde" class="property">systems.*.streams.*.samza.msg.serde</a>
-                                <code>= metrics-serde</code> (replacing the asterisks with the
-                                <span class="system">system-name</span> and <span class="stream">stream-name</span>
-                                of the metrics stream)</li>
+                            <li><a href="#systems-samza-msg-serde" class="property">streams.*.samza.msg.serde</a>
+                                <code>= metrics-serde</code> (replacing the asterisk with the
+                                <span class="stream">stream-name</span> of the metrics stream)</li>
                             <li><a href="#serializers-registry-class" class="property">serializers.registry.metrics-serde.class</a>
                                 <code>= org.apache.samza.serializers.MetricsSnapshotSerdeFactory</code>
                                 (registering the serde under a <span class="serde">serde-name</span> of
@@ -1789,37 +1900,37 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="hdfs-writer-class">systems.*.producer.hdfs.writer.class</td>
-                    <td class="default">org.apache.samza.system.hdfs.writer.BinarySequenceFileHdfsWriter</td>
+                    <td class="property" id="hdfs-writer-class">systems.<span class="system">system-name</span>.<br>.producer.hdfs.writer.class</td>
+                    <td class="default">org.apache.samza.system.hdfs.<br>writer.BinarySequenceFileHdfsWriter</td>
                     <td class="description">Fully-qualified class name of the HdfsWriter implementation this HDFS Producer system should use</td>
                 </tr>
                 <tr>
-                  <td class="property" id="hdfs-compression-type">systems.*.producer.hdfs.compression.type</td>
+                  <td class="property" id="hdfs-compression-type">systems.<span class="system">system-name</span>.<br>.producer.hdfs.compression.type</td>
                     <td class="default">none</td>
                     <td class="description">A human-readable label for the compression type to use, such as "gzip" "snappy" etc. This label will be interpreted differently (or ignored) depending on the nature of the HdfsWriter implementation.</td>
                 </tr>
                 <tr>
-                    <td class="property" id="hdfs-base-output-dir">systems.*.producer.hdfs.base.output.dir</td>
+                    <td class="property" id="hdfs-base-output-dir">systems.<span class="system">system-name</span>.<br>.producer.hdfs.base.output.dir</td>
                     <td class="default">/user/USERNAME/SYSTEMNAME</td>
                     <td class="description">The base output directory for HDFS writes. Defaults to the home directory of the user who ran the job, followed by the systemName for this HdfsSystemProducer as defined in the job.properties file.</td>
                 </tr>
                 <tr>
-                    <td class="property" id="hdfs-bucketer-class">systems.*.producer.hdfs.bucketer.class</td>
-                    <td class="default">org.apache.samza.system.hdfs.writer.JobNameDateTimeBucketer</td>
+                    <td class="property" id="hdfs-bucketer-class">systems.<span class="system">system-name</span>.<br>.producer.hdfs.bucketer.class</td>
+                    <td class="default">org.apache.samza.system.hdfs.<br>writer.JobNameDateTimeBucketer</td>
                     <td class="description">Fully-qualified class name of the Bucketer implementation that will manage HDFS paths and file names. Used to batch writes by time, or other similar partitioning methods.</td>
                 </tr>
                 <tr>
-                    <td class="property" id="hdfs-bucketer-date-path-format">systems.*.producer.hdfs.bucketer.date.path.format</td>
+                    <td class="property" id="hdfs-bucketer-date-path-format">systems.<span class="system">system-name</span>.<br>.producer.hdfs.bucketer.date.path.format</td>
                     <td class="default"yyyy_MM_dd></td>
                     <td class="description">A date format (using Java's SimpleDataFormat syntax) appropriate for use in an HDFS Path, which can configure time-based bucketing of output files.</td>
                 </tr>
                 <tr>
-                  <td class="property" id="hdfs-write-batch-size-bytes">systems.*.producer.hdfs.write.batch.size.bytes</td>
+                  <td class="property" id="hdfs-write-batch-size-bytes">systems.<span class="system">system-name</span>.<br>.producer.hdfs.write.batch.size.bytes</td>
                   <td class="default">268435456</td>
                   <td class="description">The number of bytes of outgoing messages to write to each HDFS output file before cutting a new file. Defaults to 256MB if not set.</td>
                 </tr>
                 <tr>
-                    <td class="property" id="hdfs-write-batch-size-records">systems.*.producer.hdfs.write.batch.size.records</td>
+                    <td class="property" id="hdfs-write-batch-size-records">systems.<span class="system">system-name</span>.<br>.producer.hdfs.write.batch.size.records</td>
                     <td class="default">262144</td>
                     <td class="description">The number of outgoing messages to write to each HDFS output file before cutting a new file. Defaults to 262144 if not set.</td>
                 </tr>
@@ -1829,37 +1940,37 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="hdfs-consumer-buffer-capacity">systems.*.consumer.bufferCapacity</td>
+                    <td class="property" id="hdfs-consumer-buffer-capacity">systems.<span class="system">system-name</span>.<br>.consumer.bufferCapacity</td>
                     <td class="default">10</td>
                     <td class="description">Capacity of the hdfs consumer buffer - the blocking queue used for storing messages. Larger buffer capacity typically leads to better throughput but consumes more memory.</td>
                 </tr>
                 <tr>
-                    <td class="property" id="hdfs-consumer-numMaxRetries">systems.*.consumer.numMaxRetries</td>
+                    <td class="property" id="hdfs-consumer-numMaxRetries">systems.<span class="system">system-name</span>.<br>.consumer.numMaxRetries</td>
                     <td class="default">10</td>
                     <td class="description">The number of retry attempts when there is a failure to fetch messages from HDFS, before the container fails.</td>
                 </tr>
                 <tr>
-                    <td class="property" id="hdfs-partitioner-whitelist">systems.*.partitioner.defaultPartitioner.whitelist</td>
+                    <td class="property" id="hdfs-partitioner-whitelist">systems.<span class="system">system-name</span>.<br>.partitioner.defaultPartitioner.whitelist</td>
                     <td class="default">.*</td>
                     <td class="description">White list used by directory partitioner to select files in a hdfs directory, in Java Pattern style.</td>
                 </tr>
                 <tr>
-                    <td class="property" id="hdfs-partitioner-blacklist">systems.*.partitioner.defaultPartitioner.blacklist</td>
+                    <td class="property" id="hdfs-partitioner-blacklist">systems.<span class="system">system-name</span>.<br>.partitioner.defaultPartitioner.blacklist</td>
                     <td class="default"></td>
                     <td class="description">Black list used by directory partitioner to filter out unwanted files in a hdfs directory, in Java Pattern style.</td>
                 </tr>
                 <tr>
-                    <td class="property" id="hdfs-partitioner-group-pattern">systems.*.partitioner.defaultPartitioner.groupPattern</td>
+                    <td class="property" id="hdfs-partitioner-group-pattern">systems.<span class="system">system-name</span>.<br>.partitioner.defaultPartitioner.groupPattern</td>
                     <td class="default"></td>
                     <td class="description">Group pattern used by directory partitioner for advanced partitioning. The advanced partitioning goes beyond the basic assumption that each file is a partition. With advanced partitioning you can group files into partitions arbitrarily. For example, if you have a set of files as [part-01-a.avro, part-01-b.avro, part-02-a.avro, part-02-b.avro, part-03-a.avro], and you want to organize the partitions as (part-01-a.avro, part-01-b.avro), (part-02-a.avro, part-02-b.avro), (part-03-a.avro), where the numbers in the middle act as a "group identifier", you can then set this property to be "part-[id]-.*" (note that "[id]" is a reserved term here, i.e. you have to literally put it as "[id]"). The partitioner will apply this pattern to all file names and extract the "group identifier" ("[id]" in the pattern), then use the "group identifier" to group files into partitions. See more details in <a href="https://issues.apache.org/jira/secure/attachment/
 12827670/HDFSSystemConsumer.pdf">HdfsSystemConsumer design doc</a> </td>
                 </tr>
                 <tr>
-                    <td class="property" id="hdfs-consumer-reader-type">systems.*.consumer.reader</td>
+                    <td class="property" id="hdfs-consumer-reader-type">systems.<span class="system">system-name</span>.<br>.consumer.reader</td>
                     <td class="default">avro</td>
                     <td class="description">Type of the file reader for different event formats (avro, plain, json, etc.). "avro" is only type supported for now.</td>
                 </tr>
                 <tr>
-                    <td class="property" id="hdfs-staging-directory">systems.*.stagingDirectory</td>
+                    <td class="property" id="hdfs-staging-directory">systems.<span class="system">system-name</span>.<br>.stagingDirectory</td>
                     <td class="default"></td>
                     <td class="description">Staging directory for storing partition description. By default (if not set by users) the value is inherited from "yarn.job.staging.directory" internally. The default value is typically good enough unless you want explicitly use a separate location.</td>
                 </tr>

http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/docs/learn/documentation/versioned/jobs/logging.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/logging.md b/docs/learn/documentation/versioned/jobs/logging.md
index 6d65984..44eeb3c 100644
--- a/docs/learn/documentation/versioned/jobs/logging.md
+++ b/docs/learn/documentation/versioned/jobs/logging.md
@@ -139,6 +139,11 @@ to log4j.xml and define the system name by specifying the config:
 task.log4j.system="<system-name>"
 {% endhighlight %}
 
+The default stream name for logger is generated using the following convention, though you can override it using the `StreamName` property in the log4j.xml as shown above.
+```java
+"__samza_%s_%s_logs" format (jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
+```
+
 Configuring the StreamAppender will automatically encode messages using logstash's [Log4J JSON format](https://github.com/logstash/log4j-jsonevent-layout). Samza also supports pluggable serialization for those that prefer non-JSON logging events. This can be configured the same way other stream serializers are defined:
 
 {% highlight jproperties %}

http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java b/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
index ad37eb3..8444c91 100644
--- a/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
+++ b/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.system;
 
+import java.lang.reflect.Constructor;
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.operators.StreamGraphBuilder;
@@ -26,6 +27,9 @@ import org.apache.samza.config.Config;
 
 /**
  * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.operators.StreamGraph}
+ *
+ * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
+ * to support the {@link ExecutionEnvironment#fromConfig(Config)} static constructor.
  */
 @InterfaceStability.Unstable
 public interface ExecutionEnvironment {
@@ -46,13 +50,17 @@ public interface ExecutionEnvironment {
   /**
    * Static method to load the non-standalone environment.
    *
+   * Requires the implementation class to define a constructor with a single {@link Config} as the argument.
+   *
    * @param config  configuration passed in to initialize the Samza processes
    * @return  the configure-driven {@link ExecutionEnvironment} to run the user-defined stream applications
    */
   static ExecutionEnvironment fromConfig(Config config) {
     try {
-      if (ExecutionEnvironment.class.isAssignableFrom(Class.forName(config.get(ENVIRONMENT_CONFIG, DEFAULT_ENVIRONMENT_CLASS)))) {
-        return (ExecutionEnvironment) Class.forName(config.get(ENVIRONMENT_CONFIG, DEFAULT_ENVIRONMENT_CLASS)).newInstance();
+      Class<?> environmentClass = Class.forName(config.get(ENVIRONMENT_CONFIG, DEFAULT_ENVIRONMENT_CLASS));
+      if (ExecutionEnvironment.class.isAssignableFrom(environmentClass)) {
+        Constructor<?> constructor = environmentClass.getConstructor(Config.class); // *sigh*
+        return (ExecutionEnvironment) constructor.newInstance(config);
       }
     } catch (Exception e) {
       throw new ConfigException(String.format("Problem in loading ExecutionEnvironment class %s", config.get(ENVIRONMENT_CONFIG)), e);
@@ -70,4 +78,24 @@ public interface ExecutionEnvironment {
    */
   void run(StreamGraphBuilder graphBuilder, Config config);
 
+  /**
+   * Constructs a {@link StreamSpec} from the configuration for the specified streamId.
+   *
+   * The stream configurations are read from the following properties in the config:
+   * {@code streams.{$streamId}.*}
+   * <br>
+   * All properties matching this pattern are assumed to be system-specific with two exceptions. The following two
+   * properties are Samza properties which are used to bind the stream to a system and a physical resource on that system.
+   *
+   * <ul>
+   *   <li>samza.system -         The name of the System on which this stream will be used. If this property isn't defined
+   *                              the stream will be associated with the System defined in {@code job.default.system}</li>
+   *   <li>samza.physical.name -  The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
+   *                              If this property isn't defined the physical.name will be set to the streamId</li>
+   * </ul>
+   *
+   * @param streamId  The logical identifier for the stream in Samza.
+   * @return          The {@link StreamSpec} instance.
+   */
+  StreamSpec streamFromConfig(String streamId);
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
index e953d46..5711a8b 100644
--- a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
+++ b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
@@ -137,13 +137,8 @@ public class StreamSpec {
    * @param config          A map of properties for the stream. These may be System-specfic.
    */
   public StreamSpec(String id, String physicalName, String systemName, int partitionCount,  Map<String, String> config) {
-    if (id == null) {
-      throw new NullPointerException("Parameter 'id' must not be null");
-    }
-
-    if (systemName == null) {
-      throw new NullPointerException("Parameter 'systemName' must not be null");
-    }
+    validateLogicalIdentifier("id", id);
+    validateLogicalIdentifier("systemName", systemName);
 
     if (partitionCount < 1) {
       throw new IllegalArgumentException("Parameter 'partitionCount' must be greater than 0");
@@ -200,4 +195,10 @@ public class StreamSpec {
   public String getOrDefault(String propertyName, String defaultValue) {
     return config.getOrDefault(propertyName, defaultValue);
   }
+
+  private void validateLogicalIdentifier(String identifierName, String identifierValue) {
+    if (!identifierValue.matches("[A-Za-z0-9_-]+")) {
+      throw new IllegalArgumentException(String.format("Identifier '%s' is '%s'. It must match the expression [A-Za-z0-9_-]+", identifierName, identifierValue));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java
new file mode 100644
index 0000000..64d60b7
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system;
+
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.StreamConfig;
+
+
+public abstract class AbstractExecutionEnvironment implements ExecutionEnvironment {
+
+  private final Config config;
+
+  public AbstractExecutionEnvironment(Config config) {
+    if (config == null) {
+      throw new NullPointerException("Parameter 'config' cannot be null.");
+    }
+
+    this.config = config;
+  }
+
+  @Override
+  public StreamSpec streamFromConfig(String streamId) {
+    StreamConfig streamConfig = new StreamConfig(config);
+    String physicalName = streamConfig.getPhysicalName(streamId, streamId);
+
+    return streamFromConfig(streamId, physicalName);
+  }
+
+  /**
+   * Constructs a {@link StreamSpec} from the configuration for the specified streamId.
+   *
+   * The stream configurations are read from the following properties in the config:
+   * {@code streams.{$streamId}.*}
+   * <br>
+   * All properties matching this pattern are assumed to be system-specific with one exception. The following
+   * property is a Samza property which is used to bind the stream to a system.
+   *
+   * <ul>
+   *   <li>samza.system - The name of the System on which this stream will be used. If this property isn't defined
+   *                      the stream will be associated with the System defined in {@code job.default.system}</li>
+   * </ul>
+   *
+   * @param streamId      The logical identifier for the stream in Samza.
+   * @param physicalName  The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
+   * @return              The {@link StreamSpec} instance.
+   */
+  /*package private*/ StreamSpec streamFromConfig(String streamId, String physicalName) {
+    StreamConfig streamConfig = new StreamConfig(config);
+    String system = streamConfig.getSystem(streamId);
+
+    return streamFromConfig(streamId, physicalName, system);
+  }
+
+  /**
+   * Constructs a {@link StreamSpec} from the configuration for the specified streamId.
+   *
+   * The stream configurations are read from the following properties in the config:
+   * {@code streams.{$streamId}.*}
+   *
+   * @param streamId      The logical identifier for the stream in Samza.
+   * @param physicalName  The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
+   * @param system        The name of the System on which this stream will be used.
+   * @return              The {@link StreamSpec} instance.
+   */
+  /*package private*/ StreamSpec streamFromConfig(String streamId, String physicalName, String system) {
+    StreamConfig streamConfig = new StreamConfig(config);
+    Map<String, String> properties = streamConfig.getStreamProperties(streamId);
+
+    return new StreamSpec(streamId, physicalName, system, properties);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
index fafa2cb..e592e66 100644
--- a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
+++ b/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
@@ -24,7 +24,11 @@ import org.apache.samza.config.Config;
 /**
  * This class implements the {@link ExecutionEnvironment} that runs the applications in YARN environment
  */
-public class RemoteExecutionEnvironment implements ExecutionEnvironment {
+public class RemoteExecutionEnvironment extends AbstractExecutionEnvironment {
+
+  public RemoteExecutionEnvironment(Config config) {
+    super(config);
+  }
 
   @Override public void run(StreamGraphBuilder app, Config config) {
     // TODO: add description of ProcessContext that is going to create a sub-DAG of the {@code graph}

http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
index f0f6ef2..71d60ef 100644
--- a/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
+++ b/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
@@ -28,7 +28,11 @@ import org.apache.samza.operators.StreamGraphImpl;
 /**
  * This class implements the {@link ExecutionEnvironment} that runs the applications in standalone environment
  */
-public class StandaloneExecutionEnvironment implements ExecutionEnvironment {
+public class StandaloneExecutionEnvironment extends AbstractExecutionEnvironment {
+
+  public StandaloneExecutionEnvironment(Config config) {
+    super(config);
+  }
 
   // TODO: may want to move this to a common base class for all {@link ExecutionEnvironment}
   StreamGraph createGraph(StreamGraphBuilder app, Config config) {

http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index b64e406..9d6cbc2 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -43,6 +43,7 @@ object JobConfig {
   val SAMZA_FWK_PATH = "samza.fwk.path"
   val SAMZA_FWK_VERSION = "samza.fwk.version"
   val JOB_COORDINATOR_SYSTEM = "job.coordinator.system"
+  val JOB_DEFAULT_SYSTEM = "job.default.system"
   val JOB_CONTAINER_COUNT = "job.container.count"
   val jOB_CONTAINER_THREAD_POOL_SIZE = "job.container.thread.pool.size"
   val JOB_CONTAINER_SINGLE_THREAD_MODE = "job.container.single.thread.mode"
@@ -104,6 +105,8 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
   def getCoordinatorSystemName = getOption(JobConfig.JOB_COORDINATOR_SYSTEM).getOrElse(
       throw new ConfigException("Missing job.coordinator.system configuration. Cannot proceed with job execution."))
 
+  def getDefaultSystem = getOption(JobConfig.JOB_DEFAULT_SYSTEM)
+
   def getContainerCount = {
     getOption(JobConfig.JOB_CONTAINER_COUNT) match {
       case Some(count) => count.toInt

http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
index 0ccc7df..6a3ed4b 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
@@ -19,64 +19,189 @@
 
 package org.apache.samza.config
 
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.system.SystemStream
 import org.apache.samza.util.Logging
+
 import scala.collection.JavaConversions._
-import org.apache.samza.system.SystemStream
 
 object StreamConfig {
-  // stream config constants
+  // Samza configs for streams
+  val SAMZA_PROPERTY = "samza."
+  val SYSTEM =                  SAMZA_PROPERTY + "system"
+  val PHYSICAL_NAME =           SAMZA_PROPERTY + "physical.name"
+  val MSG_SERDE =               SAMZA_PROPERTY + "msg.serde"
+  val KEY_SERDE =               SAMZA_PROPERTY + "key.serde"
+  val CONSUMER_RESET_OFFSET =   SAMZA_PROPERTY + "reset.offset"
+  val CONSUMER_OFFSET_DEFAULT = SAMZA_PROPERTY + "offset.default"
+
+  val STREAMS_PREFIX = "streams."
+  val STREAM_ID_PREFIX = STREAMS_PREFIX + "%s."
+  val SYSTEM_FOR_STREAM_ID = STREAM_ID_PREFIX + SYSTEM
+  val PHYSICAL_NAME_FOR_STREAM_ID = STREAM_ID_PREFIX + PHYSICAL_NAME
+  val SAMZA_STREAM_PROPERTY_PREFIX = STREAM_ID_PREFIX + SAMZA_PROPERTY
+
   val STREAM_PREFIX = "systems.%s.streams.%s."
-  val MSG_SERDE = STREAM_PREFIX + "samza.msg.serde"
-  val KEY_SERDE = STREAM_PREFIX + "samza.key.serde"
-  val CONSUMER_RESET_OFFSET = STREAM_PREFIX + "samza.reset.offset"
-  val CONSUMER_OFFSET_DEFAULT = STREAM_PREFIX + "samza.offset.default"
 
   implicit def Config2Stream(config: Config) = new StreamConfig(config)
 }
 
 class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
-  def getStreamMsgSerde(systemStream: SystemStream) =
-    getNonEmptyOption(StreamConfig.MSG_SERDE format (systemStream.getSystem, systemStream.getStream))
-
-  def getStreamKeySerde(systemStream: SystemStream) =
-    getNonEmptyOption(StreamConfig.KEY_SERDE format (systemStream.getSystem, systemStream.getStream))
+  def getStreamMsgSerde(systemStream: SystemStream) = nonEmptyOption(getProperty(systemStream, StreamConfig.MSG_SERDE))
 
-  def getResetOffsetMap(systemName: String) = {
-    val subConf = config.subset("systems.%s.streams." format systemName, true)
-    subConf
-      .filterKeys(k => k.endsWith(".samza.reset.offset"))
-      .map(kv => {
-        val streamName = kv._1.replace(".samza.reset.offset", "")
-        val systemStream = new SystemStream(systemName, streamName)
-        val resetVal = getResetOffset(systemStream)
-        (systemStream, resetVal)
-      }).toMap
-  }
+  def getStreamKeySerde(systemStream: SystemStream) = nonEmptyOption(getProperty(systemStream, StreamConfig.KEY_SERDE))
 
   def getResetOffset(systemStream: SystemStream) =
-    getOption(StreamConfig.CONSUMER_RESET_OFFSET format (systemStream.getSystem, systemStream.getStream)) match {
+    Option(getProperty(systemStream, StreamConfig.CONSUMER_RESET_OFFSET)) match {
       case Some("true") => true
       case Some("false") => false
       case Some(resetOffset) =>
-        warn("Got a configuration for %s that is not true, or false (was %s). Defaulting to false." format (StreamConfig.CONSUMER_RESET_OFFSET format (systemStream.getSystem, systemStream.getStream), resetOffset))
+        warn("Got a .samza.reset.offset configuration for SystemStream %s that is not true, or false (was %s). Defaulting to false."
+          format (systemStream.toString format (systemStream.getSystem, systemStream.getStream), resetOffset))
         false
       case _ => false
     }
 
   def getDefaultStreamOffset(systemStream: SystemStream) =
-    getOption(StreamConfig.CONSUMER_OFFSET_DEFAULT format (systemStream.getSystem, systemStream.getStream))
+    Option(getProperty(systemStream, StreamConfig.CONSUMER_OFFSET_DEFAULT))
 
   /**
    * Returns a list of all SystemStreams that have a serde defined from the config file.
    */
   def getSerdeStreams(systemName: String) = {
     val subConf = config.subset("systems.%s.streams." format systemName, true)
-    subConf
+    val legacySystemStreams = subConf
       .keys
-      .filter(k => k.endsWith(".samza.msg.serde") || k.endsWith(".samza.key.serde"))
+      .filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE))
       .map(k => {
         val streamName = k.substring(0, k.length - 16 /* .samza.XXX.serde length */ )
         new SystemStream(systemName, streamName)
       }).toSet
+
+    val systemStreams = subset(StreamConfig.STREAMS_PREFIX)
+      .keys
+      .filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE))
+      .map(k => {
+        val streamId = k.substring(0, k.length - 16 /* .samza.XXX.serde length */ )
+        streamIdToSystemStream(streamId)
+      }).toSet
+
+    legacySystemStreams.union(systemStreams)
+  }
+
+  /**
+    * Gets the stream properties from the legacy config style:
+    * systems.{system}.streams.{streams}.*
+    *
+    * @param systemName the system name under which the properties are configured
+    * @param streamName the stream name
+    * @return           the map of properties for the stream
+    */
+  private def getSystemStreamProperties(systemName: String, streamName: String) = {
+    if (systemName == null || streamName == null) {
+      Map()
+    }
+    config.subset(StreamConfig.STREAM_PREFIX format(systemName, streamName), true)
+  }
+
+  /**
+    * Gets the properties for the specified streamId from the config.
+    * It first applies any legacy configs from this config location:
+    * systems.{system}.streams.{stream}.*
+    *
+    * It then overrides them with properties of the new config format:
+    * streams.{streamId}.*
+    *
+    * @param streamId the identifier for the stream in the config.
+    * @return         the merged map of config properties from both the legacy and new config styles
+    */
+  def getStreamProperties(streamId: String) = {
+    val allProperties = subset(StreamConfig.STREAM_ID_PREFIX format streamId)
+    val samzaProperties = allProperties.subset(StreamConfig.SAMZA_PROPERTY, false)
+    val filteredStreamProperties:java.util.Map[String, String] = allProperties.filterKeys(k => !samzaProperties.containsKey(k))
+    val inheritedLegacyProperties:java.util.Map[String, String] = getSystemStreamProperties(getSystem(streamId), getPhysicalName(streamId, streamId))
+    new MapConfig(java.util.Arrays.asList(inheritedLegacyProperties, filteredStreamProperties))
+  }
+
+  /**
+    * Gets the System associated with the specified streamId.
+    * It first looks for the property
+    * streams.{streamId}.system
+    *
+    * If no value was provided, it uses
+    * job.default.system
+    *
+    * @param streamId the identifier for the stream in the config.
+    * @return         the system name associated with the stream.
+    */
+  def getSystem(streamId: String) = {
+    getOption(StreamConfig.SYSTEM_FOR_STREAM_ID format streamId) match {
+      case Some(system) => system
+      case _ => config.getDefaultSystem.orNull
+    }
+  }
+
+  /**
+    * Gets the physical name for the specified streamId.
+    *
+    * @param streamId             the identifier for the stream in the config.
+    * @param defaultPhysicalName  the default to use if the physical name is missing.
+    * @return                     the physical identifier for the stream or the default if it is undefined.
+    */
+  def getPhysicalName(streamId: String, defaultPhysicalName: String) = {
+    get(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID format streamId, defaultPhysicalName)
+  }
+
+  /**
+    * Gets the specified property for a SystemStream.
+    *
+    * Note, because the translation is not perfect between SystemStream and streamId,
+    * this method is not identical to getProperty(streamId, property)
+    */
+  private def getProperty(systemStream: SystemStream, property: String): String = {
+    val streamVal = getStreamProperties(systemStreamToStreamId(systemStream)).get(property)
+
+    if (streamVal != null) {
+      streamVal
+    } else {
+      getSystemStreamProperties(systemStream.getSystem(), systemStream.getStream).get(property)
+    }
+  }
+
+  private def getStreamIds(): Iterable[String] = {
+    subset(StreamConfig.STREAMS_PREFIX).keys
+  }
+
+  private def getStreamIdsForSystem(system: String): Iterable[String] = {
+    getStreamIds().filter(streamId => system.equals(getSystem(streamId)))
+  }
+
+  private def systemStreamToStreamId(systemStream: SystemStream): String = {
+   val streamIds = getStreamIdsForSystem(systemStream.getSystem).filter(streamId => systemStream.getStream().equals(getPhysicalName(streamId, streamId)))
+    if (streamIds.size > 1) {
+      throw new IllegalStateException("There was more than one stream found for system stream %s" format(systemStream))
+    }
+
+    if (streamIds.isEmpty) {
+      null
+    } else {
+      streamIds.head
+    }
+  }
+
+  /**
+    * A streamId is translated to a SystemStream by looking up its System and physicalName. It
+    * will use the streamId as the stream name if the physicalName doesn't exist.
+    */
+  private def streamIdToSystemStream(streamId: String): SystemStream = {
+    new SystemStream(getSystem(streamId), getPhysicalName(streamId, streamId))
+  }
+
+  private def nonEmptyOption(value: String): Option[String] = {
+    if (value == null || value.isEmpty) {
+      None
+    } else {
+      Some(value)
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
index 059afce..d988270 100644
--- a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
@@ -19,6 +19,9 @@
 
 package org.apache.samza.example;
 
+import java.time.Duration;
+import java.util.Set;
+import java.util.function.BiFunction;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.StreamGraph;
@@ -30,10 +33,6 @@ import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStreamPartition;
 
-import java.time.Duration;
-import java.util.function.BiFunction;
-import java.util.Set;
-
 
 /**
  * Example implementation of split stream tasks
@@ -69,7 +68,7 @@ public class TestBroadcastExample extends TestExampleBase {
     BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) -> c + 1;
     inputs.keySet().forEach(entry -> {
         MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream(
-                new StreamSpec(entry.toString(), entry.getStream(), entry.getSystem()), null, null).map(this::getInputMessage);
+                new StreamSpec(entry.getSystem() + "-" + entry.getStream(), entry.getStream(), entry.getSystem()), null, null).map(this::getInputMessage);
 
         inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
             .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));

http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
index cc53814..f956972 100644
--- a/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
@@ -64,7 +64,7 @@ public class TestJoinExample  extends TestExampleBase {
   public void init(StreamGraph graph, Config config) {
 
     for (SystemStream input : inputs.keySet()) {
-      StreamSpec inputStreamSpec = new StreamSpec(input.toString(), input.getStream(), input.getSystem());
+      StreamSpec inputStreamSpec = new StreamSpec(input.getSystem() + "-" + input.getStream(), input.getStream(), input.getSystem());
       MessageStream<JsonMessageEnvelope> newSource = graph.<Object, Object, InputMessageEnvelope>createInStream(
           inputStreamSpec, null, null).map(this::getInputMessage);
       if (joinOutput == null) {

http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
index 73f4674..6896da5 100644
--- a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
@@ -59,7 +59,7 @@ public class TestWindowExample extends TestExampleBase {
   public void init(StreamGraph graph, Config config) {
     BiFunction<JsonMessageEnvelope, Integer, Integer> maxAggregator = (m, c) -> c + 1;
     inputs.keySet().forEach(source -> graph.<Object, Object, InputMessageEnvelope>createInStream(
-            new StreamSpec(source.toString(), source.getStream(), source.getSystem()), null, null).
+            new StreamSpec(source.getSystem() + "-" + source.getStream(), source.getStream(), source.getSystem()), null, null).
         map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(),
             m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), maxAggregator)));
 

http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/samza-core/src/test/java/org/apache/samza/system/TestAbstractExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/system/TestAbstractExecutionEnvironment.java b/samza-core/src/test/java/org/apache/samza/system/TestAbstractExecutionEnvironment.java
new file mode 100644
index 0000000..861f049
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/system/TestAbstractExecutionEnvironment.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+public class TestAbstractExecutionEnvironment {
+  private static final String STREAM_ID = "t3st-Stream_Id";
+  private static final String STREAM_ID_INVALID = "test#Str3amId!";
+
+  private static final String TEST_PHYSICAL_NAME = "t3st-Physical_Name";
+  private static final String TEST_PHYSICAL_NAME2 = "testPhysicalName2";
+  private static final String TEST_PHYSICAL_NAME_SPECIAL_CHARS = "test://Physical.Name?";
+
+  private static final String TEST_SYSTEM = "t3st-System_Name";
+  private static final String TEST_SYSTEM2 = "testSystemName2";
+  private static final String TEST_SYSTEM_INVALID = "test:System!Name@";
+
+  private static final String TEST_DEFAULT_SYSTEM = "testDefaultSystemName";
+
+
+  @Test(expected = NullPointerException.class)
+  public void testConfigValidation() {
+    new TestAbstractExecutionEnvironmentImpl(null);
+  }
+
+  // The physical name should be pulled from the StreamConfig.PHYSICAL_NAME property value.
+  @Test
+  public void testStreamFromConfigWithPhysicalNameInConfig() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
+  }
+
+  // The streamId should be used as the physicalName when the physical name is not specified.
+  // NOTE: its either this, set to null, or exception. This seems better for backward compatibility and API brevity.
+  @Test
+  public void testStreamFromConfigWithoutPhysicalNameInConfig() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    assertEquals(STREAM_ID, spec.getPhysicalName());
+  }
+
+  // If the system is specified at the stream scope, use it
+  @Test
+  public void testStreamFromConfigWithSystemAtStreamScopeInConfig() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    assertEquals(TEST_SYSTEM, spec.getSystemName());
+  }
+
+  // If system isn't specified at stream scope, use the default system
+  @Test
+  public void testStreamFromConfigWithSystemAtDefaultScopeInConfig() {
+    Config config = addConfigs(buildStreamConfig(STREAM_ID,
+                                                  StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME),
+                                JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    assertEquals(TEST_DEFAULT_SYSTEM, spec.getSystemName());
+  }
+
+  // Stream scope should override default scope
+  @Test
+  public void testStreamFromConfigWithSystemAtBothScopesInConfig() {
+    Config config = addConfigs(buildStreamConfig(STREAM_ID,
+                                                StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+                                                StreamConfig.SYSTEM(), TEST_SYSTEM),
+                                JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    assertEquals(TEST_SYSTEM, spec.getSystemName());
+  }
+
+  // System is required. Throw if it cannot be determined.
+  @Test(expected = Exception.class)
+  public void testStreamFromConfigWithOutSystemInConfig() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    assertEquals(TEST_SYSTEM, spec.getSystemName());
+  }
+
+  // The properties in the config "streams.{streamId}.*" should be passed through to the spec.
+  @Test
+  public void testStreamFromConfigPropertiesPassthrough() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                    StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+                                    StreamConfig.SYSTEM(), TEST_SYSTEM,
+                                    "systemProperty1", "systemValue1",
+                                    "systemProperty2", "systemValue2",
+                                    "systemProperty3", "systemValue3");
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    Map<String, String> properties = spec.getConfig();
+    assertEquals(3, properties.size());
+    assertEquals("systemValue1", properties.get("systemProperty1"));
+    assertEquals("systemValue2", properties.get("systemProperty2"));
+    assertEquals("systemValue3", properties.get("systemProperty3"));
+    assertEquals("systemValue1", spec.get("systemProperty1"));
+    assertEquals("systemValue2", spec.get("systemProperty2"));
+    assertEquals("systemValue3", spec.get("systemProperty3"));
+  }
+
+  // The samza properties (which are invalid for the underlying system) should be filtered out.
+  @Test
+  public void testStreamFromConfigSamzaPropertiesOmitted() {
+    Config config = buildStreamConfig(STREAM_ID,
+                              StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+                                    StreamConfig.SYSTEM(), TEST_SYSTEM,
+                                    "systemProperty1", "systemValue1",
+                                    "systemProperty2", "systemValue2",
+                                    "systemProperty3", "systemValue3");
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    Map<String, String> properties = spec.getConfig();
+    assertEquals(3, properties.size());
+    assertNull(properties.get(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID)));
+    assertNull(properties.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID)));
+    assertNull(spec.get(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID)));
+    assertNull(spec.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID)));
+  }
+
+  // When the physicalName argument is passed explicitly it should be used, regardless of whether it is also in the config
+  @Test
+  public void testStreamFromConfigPhysicalNameArgSimple() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME);
+
+    assertEquals(STREAM_ID, spec.getId());
+    assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
+    assertEquals(TEST_SYSTEM, spec.getSystemName());
+  }
+
+  // Special characters are allowed for the physical name
+  @Test
+  public void testStreamFromConfigPhysicalNameArgSpecialCharacters() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME_SPECIAL_CHARS);
+    assertEquals(TEST_PHYSICAL_NAME_SPECIAL_CHARS, spec.getPhysicalName());
+  }
+
+  // Null is allowed for the physical name
+  @Test
+  public void testStreamFromConfigPhysicalNameArgNull() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID, null);
+    assertNull(spec.getPhysicalName());
+  }
+
+  // When the system name is provided explicitly, it should be used, regardless of whether it's also in the config
+  @Test
+  public void testStreamFromConfigSystemNameArgValid() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM2);              // This too
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM);
+
+    assertEquals(STREAM_ID, spec.getId());
+    assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
+    assertEquals(TEST_SYSTEM, spec.getSystemName());
+  }
+
+  // Special characters are NOT allowed for system name, because it's used as an identifier in the config.
+  @Test(expected = IllegalArgumentException.class)
+  public void testStreamFromConfigSystemNameArgInvalid() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM2);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM_INVALID);
+  }
+
+  // Empty strings are NOT allowed for system name, because it's used as an identifier in the config.
+  @Test(expected = IllegalArgumentException.class)
+  public void testStreamFromConfigSystemNameArgEmpty() {
+    Config config = buildStreamConfig(STREAM_ID,
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
+        StreamConfig.SYSTEM(), TEST_SYSTEM2);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, "");
+  }
+
+  // Null is not allowed for system name.
+  @Test(expected = NullPointerException.class)
+  public void testStreamFromConfigSystemNameArgNull() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM2);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, null);
+  }
+
+  // Special characters are NOT allowed for streamId, because it's used as an identifier in the config.
+  @Test(expected = IllegalArgumentException.class)
+  public void testStreamFromConfigStreamIdInvalid() {
+    Config config = buildStreamConfig(STREAM_ID_INVALID,
+        StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    env.streamFromConfig(STREAM_ID_INVALID);
+  }
+
+  // Empty strings are NOT allowed for streamId, because it's used as an identifier in the config.
+  @Test(expected = IllegalArgumentException.class)
+  public void testStreamFromConfigStreamIdEmpty() {
+    Config config = buildStreamConfig("",
+        StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    env.streamFromConfig("");
+  }
+
+  // Null is not allowed for streamId.
+  @Test(expected = NullPointerException.class)
+  public void testStreamFromConfigStreamIdNull() {
+    Config config = buildStreamConfig(null,
+        StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
+    env.streamFromConfig(null);
+  }
+
+
+  // Helper methods
+
+  private Config buildStreamConfig(String streamId, String... kvs) {
+    // inject streams.x. into each key
+    for (int i = 0; i < kvs.length - 1; i += 2) {
+      kvs[i] = String.format(StreamConfig.STREAM_ID_PREFIX(), streamId) + kvs[i];
+    }
+    return buildConfig(kvs);
+  }
+
+  private Config buildConfig(String... kvs) {
+    if (kvs.length % 2 != 0) {
+      throw new IllegalArgumentException("There must be parity between the keys and values");
+    }
+
+    Map<String, String> configMap = new HashMap<>();
+    for (int i = 0; i < kvs.length - 1; i += 2) {
+      configMap.put(kvs[i], kvs[i + 1]);
+    }
+    return new MapConfig(configMap);
+  }
+
+  private Config addConfigs(Config original, String... kvs) {
+    Map<String, String> result = new HashMap<>();
+    result.putAll(original);
+    result.putAll(buildConfig(kvs));
+    return new MapConfig(result);
+  }
+
+  private class TestAbstractExecutionEnvironmentImpl extends AbstractExecutionEnvironment {
+
+    public TestAbstractExecutionEnvironmentImpl(Config config) {
+      super(config);
+    }
+
+    @Override
+    public void run(StreamGraphBuilder graphBuilder, Config config) {
+      // do nothing
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index e355e7e..7e9f18a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -36,6 +36,7 @@ import java.util
 import scala.collection.JavaConverters._
 import org.apache.samza.system.kafka.KafkaSystemFactory
 import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.kafka.common.serialization.ByteArraySerializer
 
 object KafkaConfig {
@@ -165,13 +166,6 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
     kafkaChangeLogProperties
   }
 
-  def getTopicKafkaProperties(systemName: String, streamName: String) = {
-    val filteredConfigs = config.subset(StreamConfig.STREAM_PREFIX format(systemName, streamName), true)
-    val topicProperties = new Properties
-    filteredConfigs.foreach { kv => topicProperties.setProperty(kv._1, kv._2) }
-    topicProperties
-  }
-
   // kafka config
   def getKafkaSystemConsumerConfig( systemName: String,
                                     clientId: String,

http://git-wip-us.apache.org/repos/asf/samza/blob/e6c1eed4/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
index 59015a9..0f0d792 100644
--- a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
+++ b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
@@ -19,6 +19,9 @@
 
 package org.apache.samza.config;
 
+import org.apache.samza.system.SystemStream;
+
+
 /**
  * This class contains the methods for getting properties that are needed by the
  * StreamAppender.
@@ -36,7 +39,7 @@ public class Log4jSystemConfig extends JavaSystemConfig {
    * Defines whether or not to include file location information for Log4J
    * appender messages. File location information includes the method, line
    * number, class, etc.
-   * 
+   *
    * @return If true, will include file location (method, line number, etc)
    *         information in Log4J appender messages.
    */
@@ -68,7 +71,7 @@ public class Log4jSystemConfig extends JavaSystemConfig {
 
   /**
    * Get the class name according to the serde name.
-   * 
+   *
    * @param name serde name
    * @return serde factory name, or null if there is no factory defined for the
    *         supplied serde name.
@@ -78,7 +81,8 @@ public class Log4jSystemConfig extends JavaSystemConfig {
   }
 
   public String getStreamSerdeName(String systemName, String streamName) {
-    String streamSerdeNameConfig = String.format(StreamConfig.MSG_SERDE(), systemName, streamName);
-    return get(streamSerdeNameConfig, null);
+    StreamConfig streamConfig =  new StreamConfig(this);
+    scala.Option<String> option = streamConfig.getStreamMsgSerde(new SystemStream(systemName, streamName));
+    return option.isEmpty() ? null : option.get();
   }
 }