You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/03/07 23:02:43 UTC

samza git commit: SAMZA-1122; Rename ExecutionEnvironment to ApplicationRunner

Repository: samza
Updated Branches:
  refs/heads/master e6c1eed4f -> 1956dac94


SAMZA-1122; Rename ExecutionEnvironment to ApplicationRunner

Some refactoring/cleanup:

- rename ExecutionEnvironment to ApplicationRunner, including all the subclasses.
- rename the package to be org.apache.samza.runtime
- rename the StandalondApplicationRunner to be LocalApplicationRunner

Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>

Reviewers: Prateek Maheshwari <pm...@linkedin.com>

Closes #76 from xinyuiscool/SAMZA-1122 and squashes the following commits:

cff5206 [Xinyu Liu] Merge branch 'SAMZA-1122' of https://github.com/xinyuiscool/samza into SAMZA-1122
c341d3d [Xinyu Liu] SAMZA-1122: Rename ExecutionEnvironment to ApplicationRunner
6a71205 [Xinyu Liu] SAMZA-1122: Rename ExecutionEnvironment to ApplicationRunner


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1956dac9
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1956dac9
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1956dac9

Branch: refs/heads/master
Commit: 1956dac94b2c41e3e99a0e47d1f5704882b10bbd
Parents: e6c1eed
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Authored: Tue Mar 7 15:02:30 2017 -0800
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Tue Mar 7 15:02:30 2017 -0800

----------------------------------------------------------------------
 .../apache/samza/runtime/ApplicationRunner.java | 103 ++++++
 .../samza/system/ExecutionEnvironment.java      | 101 ------
 .../runtime/AbstractApplicationRunner.java      |  89 +++++
 .../samza/runtime/LocalApplicationRunner.java   |  54 +++
 .../samza/runtime/RemoteApplicationRunner.java  |  42 +++
 .../system/AbstractExecutionEnvironment.java    |  88 -----
 .../system/RemoteExecutionEnvironment.java      |  41 ---
 .../system/StandaloneExecutionEnvironment.java  |  54 ---
 .../samza/example/KeyValueStoreExample.java     |  17 +-
 .../samza/example/NoContextStreamExample.java   |  15 +-
 .../samza/example/OrderShipmentJoinExample.java |  15 +-
 .../samza/example/PageViewCounterExample.java   |   6 +-
 .../samza/example/RepartitionExample.java       |  19 +-
 .../TestAbstractExecutionEnvironment.java       | 333 +++++++++++++++++++
 .../TestAbstractExecutionEnvironment.java       | 331 ------------------
 15 files changed, 663 insertions(+), 645 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
new file mode 100644
index 0000000..ff31eff
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+import java.lang.reflect.Constructor;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.StreamSpec;
+
+
+/**
+ * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.operators.StreamGraph}
+ *
+ * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
+ * to support the {@link ApplicationRunner#fromConfig(Config)} static constructor.
+ */
+@InterfaceStability.Unstable
+public interface ApplicationRunner {
+
+  String RUNNER_CONFIG = "job.runner.class";
+  String DEFAULT_RUNNER_CLASS = "org.apache.samza.runtime.RemoteApplicationRunner";
+
+  /**
+   * Static method to create the local {@link ApplicationRunner}.
+   *
+   * @param config  configuration passed in to initialize the Samza local process
+   * @return  the local {@link ApplicationRunner} to run the user-defined stream applications
+   */
+  static ApplicationRunner getLocalRunner(Config config) {
+    return null;
+  }
+
+  /**
+   * Static method to load the {@link ApplicationRunner}
+   *
+   * Requires the implementation class to define a constructor with a single {@link Config} as the argument.
+   *
+   * @param config  configuration passed in to initialize the Samza processes
+   * @return  the configure-driven {@link ApplicationRunner} to run the user-defined stream applications
+   */
+  static ApplicationRunner fromConfig(Config config) {
+    try {
+      Class<?> runnerClass = Class.forName(config.get(RUNNER_CONFIG, DEFAULT_RUNNER_CLASS));
+      if (ApplicationRunner.class.isAssignableFrom(runnerClass)) {
+        Constructor<?> constructor = runnerClass.getConstructor(Config.class); // *sigh*
+        return (ApplicationRunner) constructor.newInstance(config);
+      }
+    } catch (Exception e) {
+      throw new ConfigException(String.format("Problem in loading ApplicationRunner class %s", config.get(
+          RUNNER_CONFIG)), e);
+    }
+    throw new ConfigException(String.format(
+        "Class %s does not implement interface ApplicationRunner properly",
+        config.get(RUNNER_CONFIG)));
+  }
+
+  /**
+   * Method to be invoked to deploy and run the actual Samza jobs to execute {@link org.apache.samza.operators.StreamGraph}
+   *
+   * @param graphBuilder  the user-defined {@link StreamGraphBuilder} object
+   * @param config  the {@link Config} object for this job
+   */
+  void run(StreamGraphBuilder graphBuilder, Config config);
+
+  /**
+   * Constructs a {@link StreamSpec} from the configuration for the specified streamId.
+   *
+   * The stream configurations are read from the following properties in the config:
+   * {@code streams.{$streamId}.*}
+   * <br>
+   * All properties matching this pattern are assumed to be system-specific with two exceptions. The following two
+   * properties are Samza properties which are used to bind the stream to a system and a physical resource on that system.
+   *
+   * <ul>
+   *   <li>samza.system -         The name of the System on which this stream will be used. If this property isn't defined
+   *                              the stream will be associated with the System defined in {@code job.default.system}</li>
+   *   <li>samza.physical.name -  The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
+   *                              If this property isn't defined the physical.name will be set to the streamId</li>
+   * </ul>
+   *
+   * @param streamId  The logical identifier for the stream in Samza.
+   * @return          The {@link StreamSpec} instance.
+   */
+  StreamSpec streamFromConfig(String streamId);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java b/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
deleted file mode 100644
index 8444c91..0000000
--- a/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.system;
-
-import java.lang.reflect.Constructor;
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.ConfigException;
-import org.apache.samza.operators.StreamGraphBuilder;
-import org.apache.samza.config.Config;
-
-
-/**
- * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.operators.StreamGraph}
- *
- * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
- * to support the {@link ExecutionEnvironment#fromConfig(Config)} static constructor.
- */
-@InterfaceStability.Unstable
-public interface ExecutionEnvironment {
-
-  String ENVIRONMENT_CONFIG = "job.execution.environment.class";
-  String DEFAULT_ENVIRONMENT_CLASS = "org.apache.samza.system.StandaloneExecutionEnvironment";
-
-  /**
-   * Static method to load the local standalone environment
-   *
-   * @param config  configuration passed in to initialize the Samza standalone process
-   * @return  the standalone {@link ExecutionEnvironment} to run the user-defined stream applications
-   */
-  static ExecutionEnvironment getLocalEnvironment(Config config) {
-    return null;
-  }
-
-  /**
-   * Static method to load the non-standalone environment.
-   *
-   * Requires the implementation class to define a constructor with a single {@link Config} as the argument.
-   *
-   * @param config  configuration passed in to initialize the Samza processes
-   * @return  the configure-driven {@link ExecutionEnvironment} to run the user-defined stream applications
-   */
-  static ExecutionEnvironment fromConfig(Config config) {
-    try {
-      Class<?> environmentClass = Class.forName(config.get(ENVIRONMENT_CONFIG, DEFAULT_ENVIRONMENT_CLASS));
-      if (ExecutionEnvironment.class.isAssignableFrom(environmentClass)) {
-        Constructor<?> constructor = environmentClass.getConstructor(Config.class); // *sigh*
-        return (ExecutionEnvironment) constructor.newInstance(config);
-      }
-    } catch (Exception e) {
-      throw new ConfigException(String.format("Problem in loading ExecutionEnvironment class %s", config.get(ENVIRONMENT_CONFIG)), e);
-    }
-    throw new ConfigException(String.format(
-        "Class %s does not implement interface ExecutionEnvironment properly",
-        config.get(ENVIRONMENT_CONFIG)));
-  }
-
-  /**
-   * Method to be invoked to deploy and run the actual Samza jobs to execute {@link org.apache.samza.operators.StreamGraph}
-   *
-   * @param graphBuilder  the user-defined {@link StreamGraphBuilder} object
-   * @param config  the {@link Config} object for this job
-   */
-  void run(StreamGraphBuilder graphBuilder, Config config);
-
-  /**
-   * Constructs a {@link StreamSpec} from the configuration for the specified streamId.
-   *
-   * The stream configurations are read from the following properties in the config:
-   * {@code streams.{$streamId}.*}
-   * <br>
-   * All properties matching this pattern are assumed to be system-specific with two exceptions. The following two
-   * properties are Samza properties which are used to bind the stream to a system and a physical resource on that system.
-   *
-   * <ul>
-   *   <li>samza.system -         The name of the System on which this stream will be used. If this property isn't defined
-   *                              the stream will be associated with the System defined in {@code job.default.system}</li>
-   *   <li>samza.physical.name -  The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
-   *                              If this property isn't defined the physical.name will be set to the streamId</li>
-   * </ul>
-   *
-   * @param streamId  The logical identifier for the stream in Samza.
-   * @return          The {@link StreamSpec} instance.
-   */
-  StreamSpec streamFromConfig(String streamId);
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
new file mode 100644
index 0000000..8e21ec2
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.system.StreamSpec;
+
+
+public abstract class AbstractApplicationRunner implements ApplicationRunner {
+
+  private final Config config;
+
+  public AbstractApplicationRunner(Config config) {
+    if (config == null) {
+      throw new NullPointerException("Parameter 'config' cannot be null.");
+    }
+
+    this.config = config;
+  }
+
+  @Override
+  public StreamSpec streamFromConfig(String streamId) {
+    StreamConfig streamConfig = new StreamConfig(config);
+    String physicalName = streamConfig.getPhysicalName(streamId, streamId);
+
+    return streamFromConfig(streamId, physicalName);
+  }
+
+  /**
+   * Constructs a {@link StreamSpec} from the configuration for the specified streamId.
+   *
+   * The stream configurations are read from the following properties in the config:
+   * {@code streams.{$streamId}.*}
+   * <br>
+   * All properties matching this pattern are assumed to be system-specific with one exception. The following
+   * property is a Samza property which is used to bind the stream to a system.
+   *
+   * <ul>
+   *   <li>samza.system - The name of the System on which this stream will be used. If this property isn't defined
+   *                      the stream will be associated with the System defined in {@code job.default.system}</li>
+   * </ul>
+   *
+   * @param streamId      The logical identifier for the stream in Samza.
+   * @param physicalName  The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
+   * @return              The {@link StreamSpec} instance.
+   */
+  /*package private*/ StreamSpec streamFromConfig(String streamId, String physicalName) {
+    StreamConfig streamConfig = new StreamConfig(config);
+    String system = streamConfig.getSystem(streamId);
+
+    return streamFromConfig(streamId, physicalName, system);
+  }
+
+  /**
+   * Constructs a {@link StreamSpec} from the configuration for the specified streamId.
+   *
+   * The stream configurations are read from the following properties in the config:
+   * {@code streams.{$streamId}.*}
+   *
+   * @param streamId      The logical identifier for the stream in Samza.
+   * @param physicalName  The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
+   * @param system        The name of the System on which this stream will be used.
+   * @return              The {@link StreamSpec} instance.
+   */
+  /*package private*/ StreamSpec streamFromConfig(String streamId, String physicalName, String system) {
+    StreamConfig streamConfig = new StreamConfig(config);
+    Map<String, String> properties = streamConfig.getStreamProperties(streamId);
+
+    return new StreamSpec(streamId, physicalName, system, properties);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
new file mode 100644
index 0000000..c936553
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.runtime;
+
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraphImpl;
+
+
+/**
+ * This class implements the {@link ApplicationRunner} that runs the applications in standalone environment
+ */
+public class LocalApplicationRunner extends AbstractApplicationRunner {
+
+  public LocalApplicationRunner(Config config) {
+    super(config);
+  }
+
+  // TODO: may want to move this to a common base class for all {@link ExecutionEnvironment}
+  StreamGraph createGraph(StreamGraphBuilder app, Config config) {
+    StreamGraphImpl graph = new StreamGraphImpl();
+    app.init(graph, config);
+    return graph;
+  }
+
+  @Override public void run(StreamGraphBuilder app, Config config) {
+    // 1. get logic graph for optimization
+    // StreamGraph logicGraph = this.createGraph(app, config);
+    // 2. potential optimization....
+    // 3. create new instance of StreamGraphBuilder that would generate the optimized graph
+    // 4. create all input/output/intermediate topics
+    // 5. create the configuration for StreamProcessor
+    // 6. start the StreamProcessor w/ optimized instance of StreamGraphBuilder
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
new file mode 100644
index 0000000..6e33fe8
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.Config;
+
+
+/**
+ * This class implements the {@link ApplicationRunner} that runs the applications in a remote cluster
+ */
+public class RemoteApplicationRunner extends AbstractApplicationRunner {
+
+  public RemoteApplicationRunner(Config config) {
+    super(config);
+  }
+
+  @Override public void run(StreamGraphBuilder app, Config config) {
+    // TODO: add description of ProcessContext that is going to create a sub-DAG of the {@code graph}
+    // TODO: actually instantiate the tasks and run the job, i.e.
+    // 1. create all input/output/intermediate topics
+    // 2. create the single job configuration
+    // 3. execute JobRunner to submit the single job for the whole graph
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java
deleted file mode 100644
index 64d60b7..0000000
--- a/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.system;
-
-import java.util.Map;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.StreamConfig;
-
-
-public abstract class AbstractExecutionEnvironment implements ExecutionEnvironment {
-
-  private final Config config;
-
-  public AbstractExecutionEnvironment(Config config) {
-    if (config == null) {
-      throw new NullPointerException("Parameter 'config' cannot be null.");
-    }
-
-    this.config = config;
-  }
-
-  @Override
-  public StreamSpec streamFromConfig(String streamId) {
-    StreamConfig streamConfig = new StreamConfig(config);
-    String physicalName = streamConfig.getPhysicalName(streamId, streamId);
-
-    return streamFromConfig(streamId, physicalName);
-  }
-
-  /**
-   * Constructs a {@link StreamSpec} from the configuration for the specified streamId.
-   *
-   * The stream configurations are read from the following properties in the config:
-   * {@code streams.{$streamId}.*}
-   * <br>
-   * All properties matching this pattern are assumed to be system-specific with one exception. The following
-   * property is a Samza property which is used to bind the stream to a system.
-   *
-   * <ul>
-   *   <li>samza.system - The name of the System on which this stream will be used. If this property isn't defined
-   *                      the stream will be associated with the System defined in {@code job.default.system}</li>
-   * </ul>
-   *
-   * @param streamId      The logical identifier for the stream in Samza.
-   * @param physicalName  The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
-   * @return              The {@link StreamSpec} instance.
-   */
-  /*package private*/ StreamSpec streamFromConfig(String streamId, String physicalName) {
-    StreamConfig streamConfig = new StreamConfig(config);
-    String system = streamConfig.getSystem(streamId);
-
-    return streamFromConfig(streamId, physicalName, system);
-  }
-
-  /**
-   * Constructs a {@link StreamSpec} from the configuration for the specified streamId.
-   *
-   * The stream configurations are read from the following properties in the config:
-   * {@code streams.{$streamId}.*}
-   *
-   * @param streamId      The logical identifier for the stream in Samza.
-   * @param physicalName  The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
-   * @param system        The name of the System on which this stream will be used.
-   * @return              The {@link StreamSpec} instance.
-   */
-  /*package private*/ StreamSpec streamFromConfig(String streamId, String physicalName, String system) {
-    StreamConfig streamConfig = new StreamConfig(config);
-    Map<String, String> properties = streamConfig.getStreamProperties(streamId);
-
-    return new StreamSpec(streamId, physicalName, system, properties);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
deleted file mode 100644
index e592e66..0000000
--- a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.system;
-
-import org.apache.samza.operators.StreamGraphBuilder;
-import org.apache.samza.config.Config;
-
-/**
- * This class implements the {@link ExecutionEnvironment} that runs the applications in YARN environment
- */
-public class RemoteExecutionEnvironment extends AbstractExecutionEnvironment {
-
-  public RemoteExecutionEnvironment(Config config) {
-    super(config);
-  }
-
-  @Override public void run(StreamGraphBuilder app, Config config) {
-    // TODO: add description of ProcessContext that is going to create a sub-DAG of the {@code graph}
-    // TODO: actually instantiate the tasks and run the job, i.e.
-    // 1. create all input/output/intermediate topics
-    // 2. create the single job configuration
-    // 3. execute JobRunner to submit the single job for the whole graph
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
deleted file mode 100644
index 71d60ef..0000000
--- a/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system;
-
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamGraphBuilder;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.StreamGraphImpl;
-
-
-/**
- * This class implements the {@link ExecutionEnvironment} that runs the applications in standalone environment
- */
-public class StandaloneExecutionEnvironment extends AbstractExecutionEnvironment {
-
-  public StandaloneExecutionEnvironment(Config config) {
-    super(config);
-  }
-
-  // TODO: may want to move this to a common base class for all {@link ExecutionEnvironment}
-  StreamGraph createGraph(StreamGraphBuilder app, Config config) {
-    StreamGraphImpl graph = new StreamGraphImpl();
-    app.init(graph, config);
-    return graph;
-  }
-
-  @Override public void run(StreamGraphBuilder app, Config config) {
-    // 1. get logic graph for optimization
-    // StreamGraph logicGraph = this.createGraph(app, config);
-    // 2. potential optimization....
-    // 3. create new instance of StreamGraphBuilder that would generate the optimized graph
-    // 4. create all input/output/intermediate topics
-    // 5. create the configuration for StreamProcessor
-    // 6. start the StreamProcessor w/ optimized instance of StreamGraphBuilder
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
index 4a0681e..e54d01a 100644
--- a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
@@ -31,7 +31,7 @@ import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.util.CommandLine;
@@ -43,15 +43,14 @@ import org.apache.samza.util.CommandLine;
 public class KeyValueStoreExample implements StreamGraphBuilder {
 
   /**
-   * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
-   * invoking context as in standalone:
+   * used by remote application runner to launch the job in remote program. The remote program should follow the similar
+   * invoking context as in local runner:
    *
    *   public static void main(String args[]) throws Exception {
    *     CommandLine cmdLine = new CommandLine();
    *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-   *     ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config);
-   *     UserMainExample runnableApp = new UserMainExample();
-   *     runnableApp.run(remoteEnv, config);
+   *     ApplicationRunner runner = ApplicationRunner.fromConfig(config);
+   *     runner.run(new UserMainExample(), config)
    *   }
    *
    */
@@ -67,12 +66,12 @@ public class KeyValueStoreExample implements StreamGraphBuilder {
 
   }
 
-  // standalone local program model
+  // local program model
   public static void main(String[] args) throws Exception {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
-    standaloneEnv.run(new KeyValueStoreExample(), config);
+    ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
+    localRunner.run(new KeyValueStoreExample(), config);
   }
 
   class MyStatsCounter implements FlatMapFunction<PageViewEvent, StatsOutput> {

http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java b/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
index 320680c..ef58c8b 100644
--- a/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
@@ -27,7 +27,7 @@ import org.apache.samza.operators.data.Offset;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.CommandLine;
@@ -89,15 +89,16 @@ public class NoContextStreamExample implements StreamGraphBuilder {
     }
   }
 
+
   /**
-   * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
-   * invoking context as in standalone:
+   * used by remote application runner to launch the job in remote program. The remote program should follow the similar
+   * invoking context as in local:
    *
    *   public static void main(String args[]) throws Exception {
    *     CommandLine cmdLine = new CommandLine();
    *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-   *     ExecutionEnvironment remoteEnv = ExecutionEnvironment.fromConfig(config);
-   *     remoteEnv.run(new NoContextStreamExample(), config);
+   *     ApplicationRunner runner = ApplicationRunner.fromConfig(config);
+   *     runner.run(new NoContextStreamExample(), config);
    *   }
    *
    */
@@ -119,8 +120,8 @@ public class NoContextStreamExample implements StreamGraphBuilder {
   public static void main(String[] args) throws Exception {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
-    standaloneEnv.run(new NoContextStreamExample(), config);
+    ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
+    localRunner.run(new NoContextStreamExample(), config);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
index 30ce7d2..0e60c2f 100644
--- a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -27,7 +27,7 @@ import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.util.CommandLine;
 
@@ -38,15 +38,14 @@ import org.apache.samza.util.CommandLine;
 public class OrderShipmentJoinExample implements StreamGraphBuilder {
 
   /**
-   * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
-   * invoking context as in standalone:
+   * used by remote application runner to launch the job in remote program. The remote program should follow the similar
+   * invoking context as in local runner:
    *
    *   public static void main(String args[]) throws Exception {
    *     CommandLine cmdLine = new CommandLine();
    *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-   *     ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config);
-   *     UserMainExample runnableApp = new UserMainExample();
-   *     runnableApp.run(remoteEnv, config);
+   *     ApplicationRunner runner = ApplicationRunner.fromConfig(config);
+   *     runner.run(new UserMainExample(), config);
    *   }
    *
    */
@@ -64,8 +63,8 @@ public class OrderShipmentJoinExample implements StreamGraphBuilder {
   public static void main(String[] args) throws Exception {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
-    standaloneEnv.run(new OrderShipmentJoinExample(), config);
+    ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
+    localRunner.run(new OrderShipmentJoinExample(), config);
   }
 
   StreamSpec input1 = new StreamSpec("orderStream", "OrderEvent", "kafka");

http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
index fcf67a7..886bf1c 100644
--- a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
@@ -28,7 +28,7 @@ import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.util.CommandLine;
 
@@ -57,8 +57,8 @@ public class PageViewCounterExample implements StreamGraphBuilder {
   public static void main(String[] args) {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
-    standaloneEnv.run(new PageViewCounterExample(), config);
+    ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
+    localRunner.run(new PageViewCounterExample(), config);
   }
 
   StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka");

http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
index 228668c..b8abacf 100644
--- a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
@@ -26,7 +26,7 @@ import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.util.CommandLine;
 
@@ -39,6 +39,19 @@ import java.time.Duration;
 public class RepartitionExample implements StreamGraphBuilder {
 
   /**
+   * used by remote application runner to launch the job in remote program. The remote program should follow the similar
+   * invoking context as in local runner:
+   *
+   *   public static void main(String args[]) throws Exception {
+   *     CommandLine cmdLine = new CommandLine();
+   *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+   *     ApplicationRunner runner = ApplicationRunner.fromConfig(config);
+   *     runner.run(new UserMainExample(), config);
+   *   }
+   *
+   */
+
+  /**
    * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
    * invoking context as in standalone:
    *
@@ -68,8 +81,8 @@ public class RepartitionExample implements StreamGraphBuilder {
   public static void main(String[] args) throws Exception {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
-    standaloneEnv.run(new RepartitionExample(), config);
+    ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
+    localRunner.run(new RepartitionExample(), config);
   }
 
   StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka");

http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractExecutionEnvironment.java b/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractExecutionEnvironment.java
new file mode 100644
index 0000000..4942ed6
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractExecutionEnvironment.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.system.StreamSpec;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+public class TestAbstractExecutionEnvironment {
+  private static final String STREAM_ID = "t3st-Stream_Id";
+  private static final String STREAM_ID_INVALID = "test#Str3amId!";
+
+  private static final String TEST_PHYSICAL_NAME = "t3st-Physical_Name";
+  private static final String TEST_PHYSICAL_NAME2 = "testPhysicalName2";
+  private static final String TEST_PHYSICAL_NAME_SPECIAL_CHARS = "test://Physical.Name?";
+
+  private static final String TEST_SYSTEM = "t3st-System_Name";
+  private static final String TEST_SYSTEM2 = "testSystemName2";
+  private static final String TEST_SYSTEM_INVALID = "test:System!Name@";
+
+  private static final String TEST_DEFAULT_SYSTEM = "testDefaultSystemName";
+
+
+  @Test(expected = NullPointerException.class)
+  public void testConfigValidation() {
+    new TestAbstractApplicationRunnerImpl(null);
+  }
+
+  // The physical name should be pulled from the StreamConfig.PHYSICAL_NAME property value.
+  @Test
+  public void testStreamFromConfigWithPhysicalNameInConfig() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
+  }
+
+  // The streamId should be used as the physicalName when the physical name is not specified.
+  // NOTE: its either this, set to null, or exception. This seems better for backward compatibility and API brevity.
+  @Test
+  public void testStreamFromConfigWithoutPhysicalNameInConfig() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    assertEquals(STREAM_ID, spec.getPhysicalName());
+  }
+
+  // If the system is specified at the stream scope, use it
+  @Test
+  public void testStreamFromConfigWithSystemAtStreamScopeInConfig() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    assertEquals(TEST_SYSTEM, spec.getSystemName());
+  }
+
+  // If system isn't specified at stream scope, use the default system
+  @Test
+  public void testStreamFromConfigWithSystemAtDefaultScopeInConfig() {
+    Config config = addConfigs(buildStreamConfig(STREAM_ID,
+                                                  StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME),
+                                JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM);
+
+    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    assertEquals(TEST_DEFAULT_SYSTEM, spec.getSystemName());
+  }
+
+  // Stream scope should override default scope
+  @Test
+  public void testStreamFromConfigWithSystemAtBothScopesInConfig() {
+    Config config = addConfigs(buildStreamConfig(STREAM_ID,
+                                                StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+                                                StreamConfig.SYSTEM(), TEST_SYSTEM),
+                                JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM);
+
+    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    assertEquals(TEST_SYSTEM, spec.getSystemName());
+  }
+
+  // System is required. Throw if it cannot be determined.
+  @Test(expected = Exception.class)
+  public void testStreamFromConfigWithOutSystemInConfig() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME);
+
+    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    assertEquals(TEST_SYSTEM, spec.getSystemName());
+  }
+
+  // The properties in the config "streams.{streamId}.*" should be passed through to the spec.
+  @Test
+  public void testStreamFromConfigPropertiesPassthrough() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                    StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+                                    StreamConfig.SYSTEM(), TEST_SYSTEM,
+                                    "systemProperty1", "systemValue1",
+                                    "systemProperty2", "systemValue2",
+                                    "systemProperty3", "systemValue3");
+
+    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    Map<String, String> properties = spec.getConfig();
+    assertEquals(3, properties.size());
+    assertEquals("systemValue1", properties.get("systemProperty1"));
+    assertEquals("systemValue2", properties.get("systemProperty2"));
+    assertEquals("systemValue3", properties.get("systemProperty3"));
+    assertEquals("systemValue1", spec.get("systemProperty1"));
+    assertEquals("systemValue2", spec.get("systemProperty2"));
+    assertEquals("systemValue3", spec.get("systemProperty3"));
+  }
+
+  // The samza properties (which are invalid for the underlying system) should be filtered out.
+  @Test
+  public void testStreamFromConfigSamzaPropertiesOmitted() {
+    Config config = buildStreamConfig(STREAM_ID,
+                              StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+                                    StreamConfig.SYSTEM(), TEST_SYSTEM,
+                                    "systemProperty1", "systemValue1",
+                                    "systemProperty2", "systemValue2",
+                                    "systemProperty3", "systemValue3");
+
+    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    Map<String, String> properties = spec.getConfig();
+    assertEquals(3, properties.size());
+    assertNull(properties.get(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID)));
+    assertNull(properties.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID)));
+    assertNull(spec.get(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID)));
+    assertNull(spec.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID)));
+  }
+
+  // When the physicalName argument is passed explicitly it should be used, regardless of whether it is also in the config
+  @Test
+  public void testStreamFromConfigPhysicalNameArgSimple() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME);
+
+    assertEquals(STREAM_ID, spec.getId());
+    assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
+    assertEquals(TEST_SYSTEM, spec.getSystemName());
+  }
+
+  // Special characters are allowed for the physical name
+  @Test
+  public void testStreamFromConfigPhysicalNameArgSpecialCharacters() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME_SPECIAL_CHARS);
+    assertEquals(TEST_PHYSICAL_NAME_SPECIAL_CHARS, spec.getPhysicalName());
+  }
+
+  // Null is allowed for the physical name
+  @Test
+  public void testStreamFromConfigPhysicalNameArgNull() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID, null);
+    assertNull(spec.getPhysicalName());
+  }
+
+  // When the system name is provided explicitly, it should be used, regardless of whether it's also in the config
+  @Test
+  public void testStreamFromConfigSystemNameArgValid() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM2);              // This too
+
+    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM);
+
+    assertEquals(STREAM_ID, spec.getId());
+    assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
+    assertEquals(TEST_SYSTEM, spec.getSystemName());
+  }
+
+  // Special characters are NOT allowed for system name, because it's used as an identifier in the config.
+  @Test(expected = IllegalArgumentException.class)
+  public void testStreamFromConfigSystemNameArgInvalid() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM2);
+
+    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+    env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM_INVALID);
+  }
+
+  // Empty strings are NOT allowed for system name, because it's used as an identifier in the config.
+  @Test(expected = IllegalArgumentException.class)
+  public void testStreamFromConfigSystemNameArgEmpty() {
+    Config config = buildStreamConfig(STREAM_ID,
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
+        StreamConfig.SYSTEM(), TEST_SYSTEM2);
+
+    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+    env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, "");
+  }
+
+  // Null is not allowed for system name.
+  @Test(expected = NullPointerException.class)
+  public void testStreamFromConfigSystemNameArgNull() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM2);
+
+    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+    env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, null);
+  }
+
+  // Special characters are NOT allowed for streamId, because it's used as an identifier in the config.
+  @Test(expected = IllegalArgumentException.class)
+  public void testStreamFromConfigStreamIdInvalid() {
+    Config config = buildStreamConfig(STREAM_ID_INVALID,
+        StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+    env.streamFromConfig(STREAM_ID_INVALID);
+  }
+
+  // Empty strings are NOT allowed for streamId, because it's used as an identifier in the config.
+  @Test(expected = IllegalArgumentException.class)
+  public void testStreamFromConfigStreamIdEmpty() {
+    Config config = buildStreamConfig("",
+        StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+    env.streamFromConfig("");
+  }
+
+  // Null is not allowed for streamId.
+  @Test(expected = NullPointerException.class)
+  public void testStreamFromConfigStreamIdNull() {
+    Config config = buildStreamConfig(null,
+        StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+    env.streamFromConfig(null);
+  }
+
+
+  // Helper methods
+
+  private Config buildStreamConfig(String streamId, String... kvs) {
+    // inject streams.x. into each key
+    for (int i = 0; i < kvs.length - 1; i += 2) {
+      kvs[i] = String.format(StreamConfig.STREAM_ID_PREFIX(), streamId) + kvs[i];
+    }
+    return buildConfig(kvs);
+  }
+
+  private Config buildConfig(String... kvs) {
+    if (kvs.length % 2 != 0) {
+      throw new IllegalArgumentException("There must be parity between the keys and values");
+    }
+
+    Map<String, String> configMap = new HashMap<>();
+    for (int i = 0; i < kvs.length - 1; i += 2) {
+      configMap.put(kvs[i], kvs[i + 1]);
+    }
+    return new MapConfig(configMap);
+  }
+
+  private Config addConfigs(Config original, String... kvs) {
+    Map<String, String> result = new HashMap<>();
+    result.putAll(original);
+    result.putAll(buildConfig(kvs));
+    return new MapConfig(result);
+  }
+
+  private class TestAbstractApplicationRunnerImpl extends AbstractApplicationRunner {
+
+    public TestAbstractApplicationRunnerImpl(Config config) {
+      super(config);
+    }
+
+    @Override
+    public void run(StreamGraphBuilder graphBuilder, Config config) {
+      // do nothing
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/test/java/org/apache/samza/system/TestAbstractExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/system/TestAbstractExecutionEnvironment.java b/samza-core/src/test/java/org/apache/samza/system/TestAbstractExecutionEnvironment.java
deleted file mode 100644
index 861f049..0000000
--- a/samza-core/src/test/java/org/apache/samza/system/TestAbstractExecutionEnvironment.java
+++ /dev/null
@@ -1,331 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.system;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.StreamConfig;
-import org.apache.samza.operators.StreamGraphBuilder;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-public class TestAbstractExecutionEnvironment {
-  private static final String STREAM_ID = "t3st-Stream_Id";
-  private static final String STREAM_ID_INVALID = "test#Str3amId!";
-
-  private static final String TEST_PHYSICAL_NAME = "t3st-Physical_Name";
-  private static final String TEST_PHYSICAL_NAME2 = "testPhysicalName2";
-  private static final String TEST_PHYSICAL_NAME_SPECIAL_CHARS = "test://Physical.Name?";
-
-  private static final String TEST_SYSTEM = "t3st-System_Name";
-  private static final String TEST_SYSTEM2 = "testSystemName2";
-  private static final String TEST_SYSTEM_INVALID = "test:System!Name@";
-
-  private static final String TEST_DEFAULT_SYSTEM = "testDefaultSystemName";
-
-
-  @Test(expected = NullPointerException.class)
-  public void testConfigValidation() {
-    new TestAbstractExecutionEnvironmentImpl(null);
-  }
-
-  // The physical name should be pulled from the StreamConfig.PHYSICAL_NAME property value.
-  @Test
-  public void testStreamFromConfigWithPhysicalNameInConfig() {
-    Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
-                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
-
-    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
-    StreamSpec spec = env.streamFromConfig(STREAM_ID);
-
-    assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
-  }
-
-  // The streamId should be used as the physicalName when the physical name is not specified.
-  // NOTE: its either this, set to null, or exception. This seems better for backward compatibility and API brevity.
-  @Test
-  public void testStreamFromConfigWithoutPhysicalNameInConfig() {
-    Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
-
-    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
-    StreamSpec spec = env.streamFromConfig(STREAM_ID);
-
-    assertEquals(STREAM_ID, spec.getPhysicalName());
-  }
-
-  // If the system is specified at the stream scope, use it
-  @Test
-  public void testStreamFromConfigWithSystemAtStreamScopeInConfig() {
-    Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
-                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
-
-    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
-    StreamSpec spec = env.streamFromConfig(STREAM_ID);
-
-    assertEquals(TEST_SYSTEM, spec.getSystemName());
-  }
-
-  // If system isn't specified at stream scope, use the default system
-  @Test
-  public void testStreamFromConfigWithSystemAtDefaultScopeInConfig() {
-    Config config = addConfigs(buildStreamConfig(STREAM_ID,
-                                                  StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME),
-                                JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM);
-
-    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
-    StreamSpec spec = env.streamFromConfig(STREAM_ID);
-
-    assertEquals(TEST_DEFAULT_SYSTEM, spec.getSystemName());
-  }
-
-  // Stream scope should override default scope
-  @Test
-  public void testStreamFromConfigWithSystemAtBothScopesInConfig() {
-    Config config = addConfigs(buildStreamConfig(STREAM_ID,
-                                                StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
-                                                StreamConfig.SYSTEM(), TEST_SYSTEM),
-                                JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM);
-
-    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
-    StreamSpec spec = env.streamFromConfig(STREAM_ID);
-
-    assertEquals(TEST_SYSTEM, spec.getSystemName());
-  }
-
-  // System is required. Throw if it cannot be determined.
-  @Test(expected = Exception.class)
-  public void testStreamFromConfigWithOutSystemInConfig() {
-    Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME);
-
-    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
-    StreamSpec spec = env.streamFromConfig(STREAM_ID);
-
-    assertEquals(TEST_SYSTEM, spec.getSystemName());
-  }
-
-  // The properties in the config "streams.{streamId}.*" should be passed through to the spec.
-  @Test
-  public void testStreamFromConfigPropertiesPassthrough() {
-    Config config = buildStreamConfig(STREAM_ID,
-                                    StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
-                                    StreamConfig.SYSTEM(), TEST_SYSTEM,
-                                    "systemProperty1", "systemValue1",
-                                    "systemProperty2", "systemValue2",
-                                    "systemProperty3", "systemValue3");
-
-    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
-    StreamSpec spec = env.streamFromConfig(STREAM_ID);
-
-    Map<String, String> properties = spec.getConfig();
-    assertEquals(3, properties.size());
-    assertEquals("systemValue1", properties.get("systemProperty1"));
-    assertEquals("systemValue2", properties.get("systemProperty2"));
-    assertEquals("systemValue3", properties.get("systemProperty3"));
-    assertEquals("systemValue1", spec.get("systemProperty1"));
-    assertEquals("systemValue2", spec.get("systemProperty2"));
-    assertEquals("systemValue3", spec.get("systemProperty3"));
-  }
-
-  // The samza properties (which are invalid for the underlying system) should be filtered out.
-  @Test
-  public void testStreamFromConfigSamzaPropertiesOmitted() {
-    Config config = buildStreamConfig(STREAM_ID,
-                              StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
-                                    StreamConfig.SYSTEM(), TEST_SYSTEM,
-                                    "systemProperty1", "systemValue1",
-                                    "systemProperty2", "systemValue2",
-                                    "systemProperty3", "systemValue3");
-
-    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
-    StreamSpec spec = env.streamFromConfig(STREAM_ID);
-
-    Map<String, String> properties = spec.getConfig();
-    assertEquals(3, properties.size());
-    assertNull(properties.get(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID)));
-    assertNull(properties.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID)));
-    assertNull(spec.get(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID)));
-    assertNull(spec.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID)));
-  }
-
-  // When the physicalName argument is passed explicitly it should be used, regardless of whether it is also in the config
-  @Test
-  public void testStreamFromConfigPhysicalNameArgSimple() {
-    Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg
-                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
-
-    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
-    StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME);
-
-    assertEquals(STREAM_ID, spec.getId());
-    assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
-    assertEquals(TEST_SYSTEM, spec.getSystemName());
-  }
-
-  // Special characters are allowed for the physical name
-  @Test
-  public void testStreamFromConfigPhysicalNameArgSpecialCharacters() {
-    Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
-                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
-
-    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
-    StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME_SPECIAL_CHARS);
-    assertEquals(TEST_PHYSICAL_NAME_SPECIAL_CHARS, spec.getPhysicalName());
-  }
-
-  // Null is allowed for the physical name
-  @Test
-  public void testStreamFromConfigPhysicalNameArgNull() {
-    Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
-                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
-
-    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
-    StreamSpec spec = env.streamFromConfig(STREAM_ID, null);
-    assertNull(spec.getPhysicalName());
-  }
-
-  // When the system name is provided explicitly, it should be used, regardless of whether it's also in the config
-  @Test
-  public void testStreamFromConfigSystemNameArgValid() {
-    Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg
-                                      StreamConfig.SYSTEM(), TEST_SYSTEM2);              // This too
-
-    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
-    StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM);
-
-    assertEquals(STREAM_ID, spec.getId());
-    assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
-    assertEquals(TEST_SYSTEM, spec.getSystemName());
-  }
-
-  // Special characters are NOT allowed for system name, because it's used as an identifier in the config.
-  @Test(expected = IllegalArgumentException.class)
-  public void testStreamFromConfigSystemNameArgInvalid() {
-    Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
-                                      StreamConfig.SYSTEM(), TEST_SYSTEM2);
-
-    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
-    env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM_INVALID);
-  }
-
-  // Empty strings are NOT allowed for system name, because it's used as an identifier in the config.
-  @Test(expected = IllegalArgumentException.class)
-  public void testStreamFromConfigSystemNameArgEmpty() {
-    Config config = buildStreamConfig(STREAM_ID,
-        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
-        StreamConfig.SYSTEM(), TEST_SYSTEM2);
-
-    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
-    env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, "");
-  }
-
-  // Null is not allowed for system name.
-  @Test(expected = NullPointerException.class)
-  public void testStreamFromConfigSystemNameArgNull() {
-    Config config = buildStreamConfig(STREAM_ID,
-                                      StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
-                                      StreamConfig.SYSTEM(), TEST_SYSTEM2);
-
-    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
-    env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, null);
-  }
-
-  // Special characters are NOT allowed for streamId, because it's used as an identifier in the config.
-  @Test(expected = IllegalArgumentException.class)
-  public void testStreamFromConfigStreamIdInvalid() {
-    Config config = buildStreamConfig(STREAM_ID_INVALID,
-        StreamConfig.SYSTEM(), TEST_SYSTEM);
-
-    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
-    env.streamFromConfig(STREAM_ID_INVALID);
-  }
-
-  // Empty strings are NOT allowed for streamId, because it's used as an identifier in the config.
-  @Test(expected = IllegalArgumentException.class)
-  public void testStreamFromConfigStreamIdEmpty() {
-    Config config = buildStreamConfig("",
-        StreamConfig.SYSTEM(), TEST_SYSTEM);
-
-    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
-    env.streamFromConfig("");
-  }
-
-  // Null is not allowed for streamId.
-  @Test(expected = NullPointerException.class)
-  public void testStreamFromConfigStreamIdNull() {
-    Config config = buildStreamConfig(null,
-        StreamConfig.SYSTEM(), TEST_SYSTEM);
-
-    AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
-    env.streamFromConfig(null);
-  }
-
-
-  // Helper methods
-
-  private Config buildStreamConfig(String streamId, String... kvs) {
-    // inject streams.x. into each key
-    for (int i = 0; i < kvs.length - 1; i += 2) {
-      kvs[i] = String.format(StreamConfig.STREAM_ID_PREFIX(), streamId) + kvs[i];
-    }
-    return buildConfig(kvs);
-  }
-
-  private Config buildConfig(String... kvs) {
-    if (kvs.length % 2 != 0) {
-      throw new IllegalArgumentException("There must be parity between the keys and values");
-    }
-
-    Map<String, String> configMap = new HashMap<>();
-    for (int i = 0; i < kvs.length - 1; i += 2) {
-      configMap.put(kvs[i], kvs[i + 1]);
-    }
-    return new MapConfig(configMap);
-  }
-
-  private Config addConfigs(Config original, String... kvs) {
-    Map<String, String> result = new HashMap<>();
-    result.putAll(original);
-    result.putAll(buildConfig(kvs));
-    return new MapConfig(result);
-  }
-
-  private class TestAbstractExecutionEnvironmentImpl extends AbstractExecutionEnvironment {
-
-    public TestAbstractExecutionEnvironmentImpl(Config config) {
-      super(config);
-    }
-
-    @Override
-    public void run(StreamGraphBuilder graphBuilder, Config config) {
-      // do nothing
-    }
-  }
-}