You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/13 02:23:09 UTC

[2/2] samza git commit: Application Descriptor implementation cleanup.

Application Descriptor implementation cleanup.

Major changes:
1. Move withDefaultSystem to the base ApplicationDescriptor interface so that it can be used for TaskApplication as well.

2. Move some of the common state (input/output/system/table descriptors) and corresponding helper methods from the StreamApplicationDescriptorImpl/TaskApplicationDescriptorImpl to the ApplicationDescriptor class.

Author: Prateek Maheshwari <pm...@apache.org>

Reviewers: Cameron Lee <ca...@linkedin.com>, Yi Pan <ni...@gmail.com>

Closes #702 from prateekm/app-descriptor-cleanup


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ef359972
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ef359972
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ef359972

Branch: refs/heads/master
Commit: ef359972161e6c12f9b4eb16d9c1a34446daa1cb
Parents: 74675ce
Author: Prateek Maheshwari <pm...@apache.org>
Authored: Fri Oct 12 19:22:54 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Fri Oct 12 19:22:54 2018 -0700

----------------------------------------------------------------------
 .../descriptors/ApplicationDescriptor.java      |  12 +
 .../StreamApplicationDescriptor.java            |  12 -
 .../descriptors/ApplicationDescriptorImpl.java  | 178 ++++++++----
 .../StreamApplicationDescriptorImpl.java        | 185 ++-----------
 .../TaskApplicationDescriptorImpl.java          |  76 +-----
 .../org/apache/samza/execution/JobGraph.java    |   2 +-
 .../samza/operators/OperatorSpecGraph.java      |   8 +-
 .../TestStreamApplicationDescriptorImpl.java    | 116 ++++----
 .../samza/execution/TestExecutionPlanner.java   |   4 +-
 .../operators/impl/TestOperatorImplGraph.java   | 269 ++++++++++---------
 .../spec/TestPartitionByOperatorSpec.java       | 128 +++++----
 .../WatermarkIntegrationTest.java               |   2 +-
 12 files changed, 441 insertions(+), 551 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/ef359972/samza-api/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptor.java b/samza-api/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptor.java
index b1e78b0..b2d54ca 100644
--- a/samza-api/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptor.java
@@ -25,6 +25,7 @@ import org.apache.samza.context.ApplicationContainerContextFactory;
 import org.apache.samza.context.ApplicationTaskContextFactory;
 import org.apache.samza.metrics.MetricsReporterFactory;
 import org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
+import org.apache.samza.system.descriptors.SystemDescriptor;
 
 
 /**
@@ -47,6 +48,17 @@ public interface ApplicationDescriptor<S extends ApplicationDescriptor> {
   Config getConfig();
 
   /**
+   * Sets the default SystemDescriptor to use for the application. This is equivalent to setting
+   * {@code job.default.system} and its properties in configuration.
+   * <p>
+   * If the default system descriptor is set, it must be set <b>before</b> creating any input/output/intermediate streams.
+   *
+   * @param defaultSystemDescriptor the default system descriptor to use
+   * @return type {@code S} of {@link ApplicationDescriptor} with {@code defaultSystemDescriptor} set as its default system
+   */
+  S withDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor);
+
+  /**
    * Sets the {@link ApplicationContainerContextFactory} for this application. Each task will be given access to a
    * different instance of the {@link org.apache.samza.context.ApplicationContainerContext} that this creates. The
    * context can be accessed through the {@link org.apache.samza.context.Context}.

http://git-wip-us.apache.org/repos/asf/samza/blob/ef359972/samza-api/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptor.java b/samza-api/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptor.java
index 383e9ce..3a35054 100644
--- a/samza-api/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptor.java
@@ -25,7 +25,6 @@ import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.table.descriptors.TableDescriptor;
-import org.apache.samza.system.descriptors.SystemDescriptor;
 import org.apache.samza.table.Table;
 
 
@@ -36,17 +35,6 @@ import org.apache.samza.table.Table;
 public interface StreamApplicationDescriptor extends ApplicationDescriptor<StreamApplicationDescriptor> {
 
   /**
-   * Sets the default SystemDescriptor to use for intermediate streams. This is equivalent to setting
-   * {@code job.default.system} and its properties in configuration.
-   * <p>
-   * If the default system descriptor is set, it must be set <b>before</b> creating any input/output/intermediate streams.
-   *
-   * @param defaultSystemDescriptor the default system descriptor to use
-   * @return type {@code S} of {@link ApplicationDescriptor} with {@code defaultSystemDescriptor} set as its default system
-   */
-  StreamApplicationDescriptor withDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor);
-
-  /**
    * Gets the input {@link MessageStream} corresponding to the {@code inputDescriptor}.
    * <p>
    * A {@code MessageStream<KV<K, V>}, obtained by calling this method with a descriptor with a {@code KVSerde<K, V>},

http://git-wip-us.apache.org/repos/asf/samza/blob/ef359972/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java
index f3c34a9..743f9dc 100644
--- a/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java
@@ -18,13 +18,19 @@
  */
 package org.apache.samza.application.descriptors;
 
+import com.google.common.base.Preconditions;
+
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import org.apache.samza.application.SamzaApplication;
+import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.config.Config;
 import org.apache.samza.context.ApplicationContainerContext;
 import org.apache.samza.context.ApplicationContainerContextFactory;
@@ -34,6 +40,7 @@ import org.apache.samza.system.descriptors.InputDescriptor;
 import org.apache.samza.system.descriptors.OutputDescriptor;
 import org.apache.samza.metrics.MetricsReporterFactory;
 import org.apache.samza.operators.KV;
+import org.apache.samza.table.descriptors.BaseHybridTableDescriptor;
 import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.system.descriptors.SystemDescriptor;
 import org.apache.samza.operators.spec.InputOperatorSpec;
@@ -56,17 +63,25 @@ import org.slf4j.LoggerFactory;
  * @param <S> the type of {@link ApplicationDescriptor} interface this implements. It has to be either
  *            {@link StreamApplicationDescriptor} or {@link TaskApplicationDescriptor}
  */
-public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor>
-    implements ApplicationDescriptor<S> {
+public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor> implements ApplicationDescriptor<S> {
   private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationDescriptorImpl.class);
+  static final Pattern ID_PATTERN = Pattern.compile("[\\d\\w-_]+");
 
   private final Class<? extends SamzaApplication> appClass;
+  private final Config config;
+
+  // We use a LHMs for deterministic order in initializing and closing operators.
+  private final Map<String, InputDescriptor> inputDescriptors = new LinkedHashMap<>();
+  private final Map<String, OutputDescriptor> outputDescriptors = new LinkedHashMap<>();
+  private final Map<String, SystemDescriptor> systemDescriptors = new LinkedHashMap<>();
+  private final Map<String, TableDescriptor> tableDescriptors = new LinkedHashMap<>();
+  private Optional<SystemDescriptor> defaultSystemDescriptorOptional = Optional.empty();
+
   private final Map<String, MetricsReporterFactory> reporterFactories = new LinkedHashMap<>();
   // serdes used by input/output/intermediate streams, keyed by streamId
   private final Map<String, KV<Serde, Serde>> streamSerdes = new HashMap<>();
   // serdes used by tables, keyed by tableId
   private final Map<String, KV<Serde, Serde>> tableSerdes = new HashMap<>();
-  final Config config;
 
   private Optional<ApplicationContainerContextFactory<?>> applicationContainerContextFactoryOptional = Optional.empty();
   private Optional<ApplicationTaskContextFactory<?>> applicationTaskContextFactoryOptional = Optional.empty();
@@ -80,30 +95,41 @@ public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor>
   }
 
   @Override
-  public Config getConfig() {
+  public final Config getConfig() {
     return config;
   }
 
   @Override
-  public S withApplicationContainerContextFactory(ApplicationContainerContextFactory<?> factory) {
+  public final S withDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor) {
+    Preconditions.checkNotNull(defaultSystemDescriptor, "Provided defaultSystemDescriptor must not be null.");
+    Preconditions.checkState(getInputStreamIds().isEmpty() && getOutputStreamIds().isEmpty(),
+        "Default system must be set before creating any input or output streams.");
+    addSystemDescriptor(defaultSystemDescriptor);
+
+    defaultSystemDescriptorOptional = Optional.of(defaultSystemDescriptor);
+    return (S) this;
+  }
+
+  @Override
+  public final S withApplicationContainerContextFactory(ApplicationContainerContextFactory<?> factory) {
     this.applicationContainerContextFactoryOptional = Optional.of(factory);
     return (S) this;
   }
 
   @Override
-  public S withApplicationTaskContextFactory(ApplicationTaskContextFactory<?> factory) {
+  public final S withApplicationTaskContextFactory(ApplicationTaskContextFactory<?> factory) {
     this.applicationTaskContextFactoryOptional = Optional.of(factory);
     return (S) this;
   }
 
   @Override
-  public S withProcessorLifecycleListenerFactory(ProcessorLifecycleListenerFactory listenerFactory) {
+  public final S withProcessorLifecycleListenerFactory(ProcessorLifecycleListenerFactory listenerFactory) {
     this.listenerFactory = listenerFactory;
     return (S) this;
   }
 
   @Override
-  public S withMetricsReporterFactories(Map<String, MetricsReporterFactory> reporterFactories) {
+  public final S withMetricsReporterFactories(Map<String, MetricsReporterFactory> reporterFactories) {
     this.reporterFactories.clear();
     this.reporterFactories.putAll(reporterFactories);
     return (S) this;
@@ -161,93 +187,106 @@ public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor>
   }
 
   /**
-   * Get the default {@link SystemDescriptor} in this application
+   * Get all the unique input streamIds in this application, including any intermediate streams.
    *
-   * @return the default {@link SystemDescriptor}
+   * @return an immutable set of input streamIds
    */
-  public Optional<SystemDescriptor> getDefaultSystemDescriptor() {
-    // default is not set
-    return Optional.empty();
+  public Set<String> getInputStreamIds() {
+    return Collections.unmodifiableSet(new HashSet<>(inputDescriptors.keySet()));
   }
 
   /**
-   * Get the corresponding {@link KVSerde} for the input {@code inputStreamId}
+   * Get all the unique output streamIds in this application, including any intermediate streams.
    *
-   * @param streamId id of the stream
-   * @return the {@link KVSerde} for the stream. null if the serde is not defined or {@code streamId} does not exist
+   * @return an immutable set of output streamIds
    */
-  public KV<Serde, Serde> getStreamSerdes(String streamId) {
-    return streamSerdes.get(streamId);
+  public Set<String> getOutputStreamIds() {
+    return Collections.unmodifiableSet(new HashSet<>(outputDescriptors.keySet()));
   }
 
   /**
-   * Get the corresponding {@link KVSerde} for the input {@code inputStreamId}
+   * Get all the intermediate broadcast streamIds for this application
    *
-   * @param tableId id of the table
-   * @return the {@link KVSerde} for the stream. null if the serde is not defined or {@code streamId} does not exist
+   * @return an immutable set of streamIds
    */
-  public KV<Serde, Serde> getTableSerdes(String tableId) {
-    return tableSerdes.get(tableId);
+  public Set<String> getIntermediateBroadcastStreamIds() {
+    return Collections.emptySet();
   }
 
   /**
-   * Get the map of all {@link InputOperatorSpec}s in this applicaiton
+   * Get all the {@link InputDescriptor}s to this application
    *
-   * @return an immutable map from streamId to {@link InputOperatorSpec}. Default to empty map for low-level
-   * {@link org.apache.samza.application.TaskApplication}
+   * @return an immutable map of streamId to {@link InputDescriptor}
    */
-  public Map<String, InputOperatorSpec> getInputOperators() {
-    return Collections.EMPTY_MAP;
+  public Map<String, InputDescriptor> getInputDescriptors() {
+    return Collections.unmodifiableMap(inputDescriptors);
   }
 
   /**
-   * Get all the {@link InputDescriptor}s to this application
+   * Get all the {@link OutputDescriptor}s for this application
    *
-   * @return an immutable map of streamId to {@link InputDescriptor}
+   * @return an immutable map of streamId to {@link OutputDescriptor}
    */
-  public abstract Map<String, InputDescriptor> getInputDescriptors();
+  public Map<String, OutputDescriptor> getOutputDescriptors() {
+    return Collections.unmodifiableMap(outputDescriptors);
+  }
 
   /**
-   * Get all the {@link OutputDescriptor}s from this application
+   * Get all the {@link SystemDescriptor}s in this application
    *
-   * @return an immutable map of streamId to {@link OutputDescriptor}
+   * @return an immutable set of {@link SystemDescriptor}s
    */
-  public abstract Map<String, OutputDescriptor> getOutputDescriptors();
+  public Set<SystemDescriptor> getSystemDescriptors() {
+    return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values()));
+  }
 
   /**
-   * Get all the broadcast streamIds from this application
+   * Get the default {@link SystemDescriptor} in this application
    *
-   * @return an immutable set of streamIds
+   * @return the default {@link SystemDescriptor}
    */
-  public abstract Set<String> getBroadcastStreams();
+  public Optional<SystemDescriptor> getDefaultSystemDescriptor() {
+    return defaultSystemDescriptorOptional;
+  }
 
   /**
    * Get all the {@link TableDescriptor}s in this application
    *
    * @return an immutable set of {@link TableDescriptor}s
    */
-  public abstract Set<TableDescriptor> getTableDescriptors();
+  public Set<TableDescriptor> getTableDescriptors() {
+    return Collections.unmodifiableSet(new HashSet<>(tableDescriptors.values()));
+  }
 
   /**
-   * Get all the unique {@link SystemDescriptor}s in this application
+   * Get a map of all {@link InputOperatorSpec}s in this application
    *
-   * @return an immutable set of {@link SystemDescriptor}s
+   * @return an immutable map from streamId to {@link InputOperatorSpec}. Default to empty map for low-level
+   * {@link org.apache.samza.application.TaskApplication}
    */
-  public abstract Set<SystemDescriptor> getSystemDescriptors();
+  public Map<String, InputOperatorSpec> getInputOperators() {
+    return Collections.emptyMap();
+  }
 
   /**
-   * Get all the unique input streamIds in this application
+   * Get the corresponding {@link KVSerde} for the input {@code inputStreamId}
    *
-   * @return an immutable set of input streamIds
+   * @param streamId id of the stream
+   * @return the {@link KVSerde} for the stream. null if the serde is not defined or {@code streamId} does not exist
    */
-  public abstract Set<String> getInputStreamIds();
+  public KV<Serde, Serde> getStreamSerdes(String streamId) {
+    return streamSerdes.get(streamId);
+  }
 
   /**
-   * Get all the unique output streamIds in this application
+   * Get the corresponding {@link KVSerde} for the input {@code inputStreamId}
    *
-   * @return an immutable set of output streamIds
+   * @param tableId id of the table
+   * @return the {@link KVSerde} for the stream. null if the serde is not defined or {@code streamId} does not exist
    */
-  public abstract Set<String> getOutputStreamIds();
+  public KV<Serde, Serde> getTableSerdes(String tableId) {
+    return tableSerdes.get(tableId);
+  }
 
   KV<Serde, Serde> getOrCreateStreamSerdes(String streamId, Serde serde) {
     Serde keySerde, valueSerde;
@@ -273,7 +312,7 @@ public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor>
       }
       streamSerdes.put(streamId, KV.of(keySerde, valueSerde));
     } else if (!currentSerdePair.getKey().equals(keySerde) || !currentSerdePair.getValue().equals(valueSerde)) {
-      throw new IllegalArgumentException(String.format("Serde for stream %s is already defined. Cannot change it to "
+      throw new IllegalArgumentException(String.format("Serde for streamId: %s is already defined. Cannot change it to "
           + "different serdes.", streamId));
     }
     return streamSerdes.get(streamId);
@@ -297,4 +336,47 @@ public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor>
     return streamSerdes.get(tableId);
   }
 
+  final void addInputDescriptor(InputDescriptor inputDescriptor) {
+    String streamId = inputDescriptor.getStreamId();
+    Preconditions.checkState(!inputDescriptors.containsKey(streamId)
+            || inputDescriptors.get(streamId) == inputDescriptor,
+        String.format("Cannot add multiple input descriptors with the same streamId: %s", streamId));
+    inputDescriptors.put(streamId, inputDescriptor);
+    addSystemDescriptor(inputDescriptor.getSystemDescriptor());
+  }
+
+  final void addOutputDescriptor(OutputDescriptor outputDescriptor) {
+    String streamId = outputDescriptor.getStreamId();
+    Preconditions.checkState(!outputDescriptors.containsKey(streamId)
+            || outputDescriptors.get(streamId) == outputDescriptor,
+        String.format("Cannot add an output descriptor multiple times with the same streamId: %s", streamId));
+    outputDescriptors.put(streamId, outputDescriptor);
+    addSystemDescriptor(outputDescriptor.getSystemDescriptor());
+  }
+
+  final void addTableDescriptor(TableDescriptor tableDescriptor) {
+    String tableId = tableDescriptor.getTableId();
+    Preconditions.checkState(StringUtils.isNotBlank(tableId) && ID_PATTERN.matcher(tableId).matches(),
+        String.format("tableId: %s must confirm to pattern: %s", tableId, ID_PATTERN.toString()));
+    Preconditions.checkState(!tableDescriptors.containsKey(tableId)
+        || tableDescriptors.get(tableId) == tableDescriptor,
+        String.format("Cannot add multiple table descriptors with the same tableId: %s", tableId));
+
+    if (tableDescriptor instanceof BaseHybridTableDescriptor) {
+      List<? extends TableDescriptor> tableDescs =
+          ((BaseHybridTableDescriptor) tableDescriptor).getTableDescriptors();
+      tableDescs.forEach(td -> addTableDescriptor(td));
+    }
+
+    tableDescriptors.put(tableId, tableDescriptor);
+  }
+
+  // check uniqueness of the {@code systemDescriptor} and add if it is unique
+  private void addSystemDescriptor(SystemDescriptor systemDescriptor) {
+    String systemName = systemDescriptor.getSystemName();
+    Preconditions.checkState(!systemDescriptors.containsKey(systemName)
+            || systemDescriptors.get(systemName) == systemDescriptor,
+        "Must not use different system descriptor instances for the same system name: " + systemName);
+    systemDescriptors.put(systemName, systemDescriptor);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/ef359972/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
index ec36cf3..e57c957 100644
--- a/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/application/descriptors/StreamApplicationDescriptorImpl.java
@@ -20,14 +20,13 @@ package org.apache.samza.application.descriptors;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.regex.Pattern;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
@@ -54,10 +53,6 @@ import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.table.Table;
-import org.apache.samza.table.TableSpec;
-import org.apache.samza.table.descriptors.BaseHybridTableDescriptor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * This class defines:
@@ -67,22 +62,13 @@ import org.slf4j.LoggerFactory;
  */
 public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<StreamApplicationDescriptor>
     implements StreamApplicationDescriptor {
-  private static final Logger LOGGER = LoggerFactory.getLogger(StreamApplicationDescriptorImpl.class);
-  private static final Pattern ID_PATTERN = Pattern.compile("[\\d\\w-_]+");
 
-  private final Map<String, InputDescriptor> inputDescriptors = new LinkedHashMap<>();
-  private final Map<String, OutputDescriptor> outputDescriptors = new LinkedHashMap<>();
-  private final Set<String> broadcastStreams = new HashSet<>();
-  private final Map<String, TableDescriptor> tableDescriptors = new LinkedHashMap<>();
-  private final Map<String, SystemDescriptor> systemDescriptors = new LinkedHashMap<>();
-  // We use a LHM for deterministic order in initializing and closing operators.
+  // We use a LHMs for deterministic order in initializing and closing operators.
+  private final Set<String> intermediateBroadcastStreamIds = new HashSet<>();
   private final Map<String, InputOperatorSpec> inputOperators = new LinkedHashMap<>();
   private final Map<String, OutputStreamImpl> outputStreams = new LinkedHashMap<>();
-  private final Map<String, TableImpl> tables = new LinkedHashMap<>();
   private final Set<String> operatorIds = new HashSet<>();
 
-  private Optional<SystemDescriptor> defaultSystemDescriptorOptional = Optional.empty();
-
   /**
    * The 0-based position of the next operator in the graph.
    * Part of the unique ID for each OperatorSpec in the graph.
@@ -96,17 +82,6 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
   }
 
   @Override
-  public StreamApplicationDescriptor withDefaultSystem(SystemDescriptor<?> defaultSystemDescriptor) {
-    Preconditions.checkNotNull(defaultSystemDescriptor, "Provided defaultSystemDescriptor must not be null.");
-    Preconditions.checkState(inputOperators.isEmpty() && outputStreams.isEmpty(),
-        "Default system must be set before creating any input or output streams.");
-    addSystemDescriptor(defaultSystemDescriptor);
-
-    defaultSystemDescriptorOptional = Optional.of(defaultSystemDescriptor);
-    return this;
-  }
-
-  @Override
   public <M> MessageStream<M> getInputStream(InputDescriptor<M, ?> inputDescriptor) {
     SystemDescriptor systemDescriptor = inputDescriptor.getSystemDescriptor();
     Optional<StreamExpander> expander = systemDescriptor.getExpander();
@@ -115,26 +90,11 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
     }
 
     // TODO: SAMZA-1841: need to add to the broadcast streams if inputDescriptor is for a broadcast stream
-    Preconditions.checkState(!inputDescriptors.containsKey(inputDescriptor.getStreamId()),
-        String.format("add input descriptors multiple times with the same streamId: %s", inputDescriptor.getStreamId()));
-    inputDescriptors.put(inputDescriptor.getStreamId(), inputDescriptor);
-    addSystemDescriptor(inputDescriptor.getSystemDescriptor());
+    addInputDescriptor(inputDescriptor);
 
     String streamId = inputDescriptor.getStreamId();
-    Preconditions.checkState(!inputOperators.containsKey(streamId),
-        "getInputStream must not be called multiple times with the same streamId: " + streamId);
-
     Serde serde = inputDescriptor.getSerde();
     KV<Serde, Serde> kvSerdes = getOrCreateStreamSerdes(streamId, serde);
-    if (outputStreams.containsKey(streamId)) {
-      OutputStreamImpl outputStream = outputStreams.get(streamId);
-      Serde keySerde = outputStream.getKeySerde();
-      Serde valueSerde = outputStream.getValueSerde();
-      Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
-          String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at "
-              + "stream level, so the same key and message Serde must be used for both.", streamId));
-    }
-
     boolean isKeyed = serde instanceof KVSerde;
     InputTransformer transformer = inputDescriptor.getTransformer().orElse(null);
     InputOperatorSpec inputOperatorSpec =
@@ -146,26 +106,11 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
 
   @Override
   public <M> OutputStream<M> getOutputStream(OutputDescriptor<M, ?> outputDescriptor) {
-    Preconditions.checkState(!outputDescriptors.containsKey(outputDescriptor.getStreamId()),
-        String.format("add output descriptors multiple times with the same streamId: %s", outputDescriptor.getStreamId()));
-    outputDescriptors.put(outputDescriptor.getStreamId(), outputDescriptor);
-    addSystemDescriptor(outputDescriptor.getSystemDescriptor());
+    addOutputDescriptor(outputDescriptor);
 
     String streamId = outputDescriptor.getStreamId();
-    Preconditions.checkState(!outputStreams.containsKey(streamId),
-        "getOutputStream must not be called multiple times with the same streamId: " + streamId);
-
     Serde serde = outputDescriptor.getSerde();
     KV<Serde, Serde> kvSerdes = getOrCreateStreamSerdes(streamId, serde);
-    if (inputOperators.containsKey(streamId)) {
-      InputOperatorSpec inputOperatorSpec = inputOperators.get(streamId);
-      Serde keySerde = inputOperatorSpec.getKeySerde();
-      Serde valueSerde = inputOperatorSpec.getValueSerde();
-      Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
-          String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at "
-              + "stream level, so the same key and message Serde must be used for both.", streamId));
-    }
-
     boolean isKeyed = serde instanceof KVSerde;
     outputStreams.put(streamId, new OutputStreamImpl<>(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
     return outputStreams.get(streamId);
@@ -173,80 +118,10 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
 
   @Override
   public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDescriptor) {
-
-    if (tableDescriptor instanceof BaseHybridTableDescriptor) {
-      List<? extends TableDescriptor<K, V, ?>> tableDescs = ((BaseHybridTableDescriptor) tableDescriptor).getTableDescriptors();
-      tableDescs.forEach(td -> getTable(td));
-    }
-
-    String tableId = tableDescriptor.getTableId();
-    Preconditions.checkState(StringUtils.isNotBlank(tableId) && ID_PATTERN.matcher(tableId).matches(),
-        String.format("tableId: %s must confirm to pattern: %s", tableId, ID_PATTERN.toString()));
-    Preconditions.checkState(!tableDescriptors.containsKey(tableDescriptor.getTableId()),
-        String.format("add table descriptors multiple times with the same tableId: %s", tableDescriptor.getTableId()));
-    tableDescriptors.put(tableDescriptor.getTableId(), tableDescriptor);
-
+    addTableDescriptor(tableDescriptor);
     BaseTableDescriptor baseTableDescriptor = (BaseTableDescriptor) tableDescriptor;
-    TableSpec tableSpec = baseTableDescriptor.getTableSpec();
-    if (tables.containsKey(tableSpec.getId())) {
-      throw new IllegalStateException(
-          String.format("getTable() invoked multiple times with the same tableId: %s", tableId));
-    }
-    tables.put(tableSpec.getId(), new TableImpl(tableSpec));
-    getOrCreateTableSerdes(tableSpec.getId(), baseTableDescriptor.getSerde());
-    return tables.get(tableSpec.getId());
-  }
-
-  /**
-   * Get all the {@link InputDescriptor}s to this application
-   *
-   * @return an immutable map of streamId to {@link InputDescriptor}
-   */
-  @Override
-  public Map<String, InputDescriptor> getInputDescriptors() {
-    return Collections.unmodifiableMap(inputDescriptors);
-  }
-
-  /**
-   * Get all the {@link OutputDescriptor}s from this application
-   *
-   * @return an immutable map of streamId to {@link OutputDescriptor}
-   */
-  @Override
-  public Map<String, OutputDescriptor> getOutputDescriptors() {
-    return Collections.unmodifiableMap(outputDescriptors);
-  }
-
-  /**
-   * Get all the broadcast streamIds from this application
-   *
-   * @return an immutable set of streamIds
-   */
-  @Override
-  public Set<String> getBroadcastStreams() {
-    return Collections.unmodifiableSet(broadcastStreams);
-  }
-
-  /**
-   * Get all the {@link TableDescriptor}s in this application
-   *
-   * @return an immutable set of {@link TableDescriptor}s
-   */
-  @Override
-  public Set<TableDescriptor> getTableDescriptors() {
-    return Collections.unmodifiableSet(new HashSet<>(tableDescriptors.values()));
-  }
-
-  /**
-   * Get all the unique {@link SystemDescriptor}s in this application
-   *
-   * @return an immutable set of {@link SystemDescriptor}s
-   */
-  @Override
-  public Set<SystemDescriptor> getSystemDescriptors() {
-    // We enforce that users must not use different system descriptor instances for the same system name
-    // when getting an input/output stream or setting the default system descriptor
-    return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values()));
+    getOrCreateTableSerdes(baseTableDescriptor.getTableId(), baseTableDescriptor.getSerde());
+    return new TableImpl(baseTableDescriptor.getTableSpec());
   }
 
   @Override
@@ -259,14 +134,17 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
     return Collections.unmodifiableSet(new HashSet<>(outputStreams.keySet()));
   }
 
-  /**
-   * Get the default {@link SystemDescriptor} in this application
-   *
-   * @return the default {@link SystemDescriptor}
-   */
   @Override
-  public Optional<SystemDescriptor> getDefaultSystemDescriptor() {
-    return defaultSystemDescriptorOptional;
+  public Set<String> getIntermediateBroadcastStreamIds() {
+    return Collections.unmodifiableSet(intermediateBroadcastStreamIds);
+  }
+
+  public Map<String, InputOperatorSpec> getInputOperators() {
+    return Collections.unmodifiableMap(inputOperators);
+  }
+
+  public Map<String, OutputStreamImpl> getOutputStreams() {
+    return Collections.unmodifiableMap(outputStreams);
   }
 
   public OperatorSpecGraph getOperatorSpecGraph() {
@@ -286,9 +164,10 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
       throw new SamzaException("Operator ID must not contain spaces or special characters: " + userDefinedId);
     }
 
+    JobConfig jobConfig = new JobConfig(getConfig());
     String nextOpId = String.format("%s-%s-%s-%s",
-        config.get(JobConfig.JOB_NAME()),
-        config.get(JobConfig.JOB_ID(), "1"),
+        jobConfig.getName().get(),
+        jobConfig.getJobId(),
         opCode.name().toLowerCase(),
         StringUtils.isNotBlank(userDefinedId) ? userDefinedId.trim() : String.valueOf(nextOpNum));
     if (!operatorIds.add(nextOpId)) {
@@ -310,18 +189,6 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
     return getNextOpId(opCode, null);
   }
 
-  public Map<String, InputOperatorSpec> getInputOperators() {
-    return Collections.unmodifiableMap(inputOperators);
-  }
-
-  public Map<String, OutputStreamImpl> getOutputStreams() {
-    return Collections.unmodifiableMap(outputStreams);
-  }
-
-  public Map<String, TableImpl> getTables() {
-    return Collections.unmodifiableMap(tables);
-  }
-
   /**
    * Internal helper for {@link MessageStreamImpl} to add an intermediate {@link MessageStream} to the graph.
    * An intermediate {@link MessageStream} is both an output and an input stream.
@@ -340,7 +207,7 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
         "getIntermediateStream must not be called multiple times with the same streamId: " + streamId);
 
     if (isBroadcast) {
-      broadcastStreams.add(streamId);
+      intermediateBroadcastStreamIds.add(streamId);
     }
 
     boolean isKeyed = serde instanceof KVSerde;
@@ -356,12 +223,4 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S
     outputStreams.put(streamId, new OutputStreamImpl(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
     return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamId), outputStreams.get(streamId));
   }
-
-  // check uniqueness of the {@code systemDescriptor} and add if it is unique
-  private void addSystemDescriptor(SystemDescriptor systemDescriptor) {
-    Preconditions.checkState(!systemDescriptors.containsKey(systemDescriptor.getSystemName())
-            || systemDescriptors.get(systemDescriptor.getSystemName()) == systemDescriptor,
-        "Must not use different system descriptor instances for the same system name: " + systemDescriptor.getSystemName());
-    systemDescriptors.put(systemDescriptor.getSystemName(), systemDescriptor);
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/ef359972/samza-core/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptorImpl.java
index cb924ab..c62a455 100644
--- a/samza-core/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/application/descriptors/TaskApplicationDescriptorImpl.java
@@ -18,19 +18,12 @@
  */
 package org.apache.samza.application.descriptors;
 
-import com.google.common.base.Preconditions;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
 import org.apache.samza.application.TaskApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.system.descriptors.InputDescriptor;
 import org.apache.samza.system.descriptors.OutputDescriptor;
 import org.apache.samza.table.descriptors.BaseTableDescriptor;
 import org.apache.samza.table.descriptors.TableDescriptor;
-import org.apache.samza.system.descriptors.SystemDescriptor;
 import org.apache.samza.task.TaskFactory;
 
 
@@ -43,13 +36,6 @@ import org.apache.samza.task.TaskFactory;
  */
 public class TaskApplicationDescriptorImpl extends ApplicationDescriptorImpl<TaskApplicationDescriptor>
     implements TaskApplicationDescriptor {
-
-  private final Map<String, InputDescriptor> inputDescriptors = new LinkedHashMap<>();
-  private final Map<String, OutputDescriptor> outputDescriptors = new LinkedHashMap<>();
-  private final Set<String> broadcastStreams = new HashSet<>();
-  private final Map<String, TableDescriptor> tableDescriptors = new LinkedHashMap<>();
-  private final Map<String, SystemDescriptor> systemDescriptors = new LinkedHashMap<>();
-
   private TaskFactory taskFactory = null;
 
   public TaskApplicationDescriptorImpl(TaskApplication userApp, Config config) {
@@ -65,65 +51,21 @@ public class TaskApplicationDescriptorImpl extends ApplicationDescriptorImpl<Tas
   @Override
   public void addInputStream(InputDescriptor inputDescriptor) {
     // TODO: SAMZA-1841: need to add to the broadcast streams if inputDescriptor is for a broadcast stream
-    Preconditions.checkState(!inputDescriptors.containsKey(inputDescriptor.getStreamId()),
-        String.format("add input descriptors multiple times with the same streamId: %s", inputDescriptor.getStreamId()));
+    addInputDescriptor(inputDescriptor);
     getOrCreateStreamSerdes(inputDescriptor.getStreamId(), inputDescriptor.getSerde());
-    inputDescriptors.put(inputDescriptor.getStreamId(), inputDescriptor);
-    addSystemDescriptor(inputDescriptor.getSystemDescriptor());
   }
 
   @Override
   public void addOutputStream(OutputDescriptor outputDescriptor) {
-    Preconditions.checkState(!outputDescriptors.containsKey(outputDescriptor.getStreamId()),
-        String.format("add output descriptors multiple times with the same streamId: %s", outputDescriptor.getStreamId()));
+    addOutputDescriptor(outputDescriptor);
     getOrCreateStreamSerdes(outputDescriptor.getStreamId(), outputDescriptor.getSerde());
-    outputDescriptors.put(outputDescriptor.getStreamId(), outputDescriptor);
-    addSystemDescriptor(outputDescriptor.getSystemDescriptor());
   }
 
   @Override
   public void addTable(TableDescriptor tableDescriptor) {
-    Preconditions.checkState(!tableDescriptors.containsKey(tableDescriptor.getTableId()),
-        String.format("add table descriptors multiple times with the same tableId: %s", tableDescriptor.getTableId()));
-    getOrCreateTableSerdes(tableDescriptor.getTableId(), ((BaseTableDescriptor) tableDescriptor).getSerde());
-    tableDescriptors.put(tableDescriptor.getTableId(), tableDescriptor);
-  }
-
-  @Override
-  public Map<String, InputDescriptor> getInputDescriptors() {
-    return Collections.unmodifiableMap(inputDescriptors);
-  }
-
-  @Override
-  public Map<String, OutputDescriptor> getOutputDescriptors() {
-    return Collections.unmodifiableMap(outputDescriptors);
-  }
-
-  @Override
-  public Set<String> getBroadcastStreams() {
-    return Collections.unmodifiableSet(broadcastStreams);
-  }
-
-  @Override
-  public Set<TableDescriptor> getTableDescriptors() {
-    return Collections.unmodifiableSet(new HashSet<>(tableDescriptors.values()));
-  }
-
-  @Override
-  public Set<SystemDescriptor> getSystemDescriptors() {
-    // We enforce that users must not use different system descriptor instances for the same system name
-    // when getting an input/output stream or setting the default system descriptor
-    return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values()));
-  }
-
-  @Override
-  public Set<String> getInputStreamIds() {
-    return Collections.unmodifiableSet(new HashSet<>(inputDescriptors.keySet()));
-  }
-
-  @Override
-  public Set<String> getOutputStreamIds() {
-    return Collections.unmodifiableSet(new HashSet<>(outputDescriptors.keySet()));
+    addTableDescriptor(tableDescriptor);
+    BaseTableDescriptor baseTableDescriptor = (BaseTableDescriptor) tableDescriptor;
+    getOrCreateTableSerdes(baseTableDescriptor.getTableId(), baseTableDescriptor.getSerde());
   }
 
   /**
@@ -133,12 +75,4 @@ public class TaskApplicationDescriptorImpl extends ApplicationDescriptorImpl<Tas
   public TaskFactory getTaskFactory() {
     return taskFactory;
   }
-
-  // check uniqueness of the {@code systemDescriptor} and add if it is unique
-  private void addSystemDescriptor(SystemDescriptor systemDescriptor) {
-    Preconditions.checkState(!systemDescriptors.containsKey(systemDescriptor.getSystemName())
-            || systemDescriptors.get(systemDescriptor.getSystemName()) == systemDescriptor,
-        "Must not use different system descriptor instances for the same system name: " + systemDescriptor.getSystemName());
-    systemDescriptors.put(systemDescriptor.getSystemName(), systemDescriptor);
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/ef359972/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
index 6d9faf3..7944dd3 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
@@ -273,7 +273,7 @@ import org.slf4j.LoggerFactory;
     String streamId = streamSpec.getId();
     StreamEdge edge = edges.get(streamId);
     if (edge == null) {
-      boolean isBroadcast = appDesc.getBroadcastStreams().contains(streamId);
+      boolean isBroadcast = appDesc.getIntermediateBroadcastStreamIds().contains(streamId);
       edge = new StreamEdge(streamSpec, isIntermediate, isBroadcast, config);
       edges.put(streamId, edge);
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/ef359972/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java b/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
index a83739d..a34d603 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java
@@ -43,7 +43,7 @@ public class OperatorSpecGraph implements Serializable {
   // We use a LHM for deterministic order in initializing and closing operators.
   private final Map<String, InputOperatorSpec> inputOperators;
   private final Map<String, OutputStreamImpl> outputStreams;
-  private final Set<String> broadcastStreams;
+  private final Set<String> intermediateBroadcastStreamIds;
   private final Set<OperatorSpec> allOpSpecs;
   private final boolean hasWindowOrJoins;
 
@@ -54,7 +54,7 @@ public class OperatorSpecGraph implements Serializable {
   public OperatorSpecGraph(StreamApplicationDescriptorImpl streamAppDesc) {
     this.inputOperators = streamAppDesc.getInputOperators();
     this.outputStreams = streamAppDesc.getOutputStreams();
-    this.broadcastStreams = streamAppDesc.getBroadcastStreams();
+    this.intermediateBroadcastStreamIds = streamAppDesc.getIntermediateBroadcastStreamIds();
     this.allOpSpecs = Collections.unmodifiableSet(this.findAllOperatorSpecs());
     this.hasWindowOrJoins = checkWindowOrJoins();
     this.serializedOpSpecGraph = opSpecGraphSerde.toBytes(this);
@@ -68,8 +68,8 @@ public class OperatorSpecGraph implements Serializable {
     return outputStreams;
   }
 
-  public Set<String> getBroadcastStreams() {
-    return broadcastStreams;
+  public Set<String> getIntermediateBroadcastStreamIds() {
+    return intermediateBroadcastStreamIds;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/ef359972/samza-core/src/test/java/org/apache/samza/application/descriptors/TestStreamApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/application/descriptors/TestStreamApplicationDescriptorImpl.java b/samza-core/src/test/java/org/apache/samza/application/descriptors/TestStreamApplicationDescriptorImpl.java
index d889486..3b680fc 100644
--- a/samza-core/src/test/java/org/apache/samza/application/descriptors/TestStreamApplicationDescriptorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/application/descriptors/TestStreamApplicationDescriptorImpl.java
@@ -19,24 +19,27 @@
 package org.apache.samza.application.descriptors;
 
 import com.google.common.collect.ImmutableList;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.context.ApplicationContainerContextFactory;
 import org.apache.samza.context.ApplicationTaskContextFactory;
+import org.apache.samza.operators.TableImpl;
+import org.apache.samza.operators.data.TestMessageEnvelope;
+import org.apache.samza.system.descriptors.ExpandingInputDescriptorProvider;
 import org.apache.samza.system.descriptors.GenericInputDescriptor;
 import org.apache.samza.system.descriptors.GenericOutputDescriptor;
-import org.apache.samza.system.descriptors.InputDescriptor;
 import org.apache.samza.system.descriptors.GenericSystemDescriptor;
-import org.apache.samza.table.descriptors.BaseTableDescriptor;
-import org.apache.samza.operators.data.TestMessageEnvelope;
-import org.apache.samza.system.descriptors.ExpandingInputDescriptorProvider;
+import org.apache.samza.system.descriptors.InputDescriptor;
 import org.apache.samza.system.descriptors.SystemDescriptor;
 import org.apache.samza.system.descriptors.TransformingInputDescriptorProvider;
 import org.apache.samza.system.descriptors.InputTransformer;
@@ -52,15 +55,13 @@ import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.table.TableSpec;
+import org.apache.samza.table.descriptors.BaseTableDescriptor;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -70,11 +71,10 @@ import static org.mockito.Mockito.when;
  * Unit test for {@link StreamApplicationDescriptorImpl}
  */
 public class TestStreamApplicationDescriptorImpl {
-
   @Test
   public void testConstructor() {
     StreamApplication mockApp = mock(StreamApplication.class);
-    Config mockConfig = mock(Config.class);
+    Config mockConfig = getConfig();
     StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(mockApp, mockConfig);
     verify(mockApp).describe(appDesc);
     assertEquals(mockConfig, appDesc.getConfig());
@@ -89,7 +89,7 @@ public class TestStreamApplicationDescriptorImpl {
     GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mockValueSerde);
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
         appDesc.getInputStream(isd);
-      }, mock(Config.class));
+      }, getConfig());
 
     InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(streamId);
     assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
@@ -112,7 +112,7 @@ public class TestStreamApplicationDescriptorImpl {
     GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mockKVSerde);
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
         appDesc.getInputStream(isd);
-      }, mock(Config.class));
+      }, getConfig());
 
     InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(streamId);
     assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
@@ -128,7 +128,7 @@ public class TestStreamApplicationDescriptorImpl {
     GenericInputDescriptor isd = sd.getInputDescriptor("mockStreamId", null);
     new StreamApplicationDescriptorImpl(appDesc -> {
         appDesc.getInputStream(isd);
-      }, mock(Config.class));
+      }, getConfig());
   }
 
   @Test
@@ -140,7 +140,7 @@ public class TestStreamApplicationDescriptorImpl {
     MockInputDescriptor isd = sd.getInputDescriptor(streamId, mockValueSerde);
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
         appDesc.getInputStream(isd);
-      }, mock(Config.class));
+      }, getConfig());
 
     InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(streamId);
     assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
@@ -166,7 +166,7 @@ public class TestStreamApplicationDescriptorImpl {
     MockInputDescriptor isd = sd.getInputDescriptor(streamId, new IntegerSerde());
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
         appDesc.getInputStream(isd);
-      }, mock(Config.class));
+      }, getConfig());
 
     InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(expandedStreamId);
     assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
@@ -184,7 +184,7 @@ public class TestStreamApplicationDescriptorImpl {
     GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mock(Serde.class));
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
         appDesc.getInputStream(isd);
-      }, mock(Config.class));
+      }, getConfig());
 
     InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(streamId);
     assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
@@ -203,7 +203,7 @@ public class TestStreamApplicationDescriptorImpl {
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
         appDesc.getInputStream(isd1);
         appDesc.getInputStream(isd2);
-      }, mock(Config.class));
+      }, getConfig());
 
     InputOperatorSpec inputOpSpec1 = streamAppDesc.getInputOperators().get(streamId1);
     InputOperatorSpec inputOpSpec2 = streamAppDesc.getInputOperators().get(streamId2);
@@ -226,7 +226,7 @@ public class TestStreamApplicationDescriptorImpl {
         appDesc.getInputStream(isd1);
         // should throw exception
         appDesc.getInputStream(isd2);
-      }, mock(Config.class));
+      }, getConfig());
   }
 
   @Test
@@ -248,7 +248,7 @@ public class TestStreamApplicationDescriptorImpl {
           appDesc.getOutputStream(osd1);
           fail("adding output stream with the same system name but different SystemDescriptor should have failed");
         } catch (IllegalStateException e) { }
-      }, mock(Config.class));
+      }, getConfig());
 
     new StreamApplicationDescriptorImpl(appDesc -> {
         appDesc.withDefaultSystem(sd2);
@@ -256,7 +256,7 @@ public class TestStreamApplicationDescriptorImpl {
           appDesc.getInputStream(isd1);
           fail("Adding input stream with the same system name as the default system but different SystemDescriptor should have failed");
         } catch (IllegalStateException e) { }
-      }, mock(Config.class));
+      }, getConfig());
   }
 
   @Test
@@ -272,7 +272,7 @@ public class TestStreamApplicationDescriptorImpl {
 
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
         appDesc.getOutputStream(osd);
-      }, mock(Config.class));
+      }, getConfig());
 
     OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = streamAppDesc.getOutputStreams().get(streamId);
     assertEquals(streamId, outputStreamImpl.getStreamId());
@@ -288,7 +288,7 @@ public class TestStreamApplicationDescriptorImpl {
     GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, null);
     new StreamApplicationDescriptorImpl(appDesc -> {
         appDesc.getOutputStream(osd);
-      }, mock(Config.class));
+      }, getConfig());
   }
 
   @Test
@@ -300,7 +300,7 @@ public class TestStreamApplicationDescriptorImpl {
 
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
         appDesc.getOutputStream(osd);
-      }, mock(Config.class));
+      }, getConfig());
 
     OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = streamAppDesc.getOutputStreams().get(streamId);
     assertEquals(streamId, outputStreamImpl.getStreamId());
@@ -318,7 +318,7 @@ public class TestStreamApplicationDescriptorImpl {
     new StreamApplicationDescriptorImpl(appDesc -> {
         appDesc.getInputStream(isd);
         appDesc.withDefaultSystem(sd); // should throw exception
-      }, mock(Config.class));
+      }, getConfig());
   }
 
   @Test(expected = IllegalStateException.class)
@@ -329,13 +329,13 @@ public class TestStreamApplicationDescriptorImpl {
     new StreamApplicationDescriptorImpl(appDesc -> {
         appDesc.getOutputStream(osd);
         appDesc.withDefaultSystem(sd); // should throw exception
-      }, mock(Config.class));
+      }, getConfig());
   }
 
   @Test(expected = IllegalStateException.class)
   public void testSetDefaultSystemDescriptorAfterGettingIntermediateStream() {
     String streamId = "test-stream-1";
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class));
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, getConfig());
     streamAppDesc.getIntermediateStream(streamId, mock(Serde.class), false);
     streamAppDesc.withDefaultSystem(mock(SystemDescriptor.class)); // should throw exception
   }
@@ -349,13 +349,13 @@ public class TestStreamApplicationDescriptorImpl {
     new StreamApplicationDescriptorImpl(appDesc -> {
         appDesc.getOutputStream(osd1);
         appDesc.getOutputStream(osd2); // should throw exception
-      }, mock(Config.class));
+      }, getConfig());
   }
 
   @Test
   public void testGetIntermediateStreamWithValueSerde() {
     String streamId = "stream-1";
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class));
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, getConfig());
 
     Serde mockValueSerde = mock(Serde.class);
     IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
@@ -373,7 +373,7 @@ public class TestStreamApplicationDescriptorImpl {
   @Test
   public void testGetIntermediateStreamWithKeyValueSerde() {
     String streamId = "streamId";
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class));
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, getConfig());
 
     KVSerde mockKVSerde = mock(KVSerde.class);
     Serde mockKeySerde = mock(Serde.class);
@@ -394,7 +394,7 @@ public class TestStreamApplicationDescriptorImpl {
 
   @Test
   public void testGetIntermediateStreamWithDefaultSystemDescriptor() {
-    Config mockConfig = mock(Config.class);
+    Config mockConfig = getConfig();
     String streamId = "streamId";
 
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
@@ -410,7 +410,7 @@ public class TestStreamApplicationDescriptorImpl {
 
   @Test(expected = NullPointerException.class)
   public void testGetIntermediateStreamWithNoSerde() {
-    Config mockConfig = mock(Config.class);
+    Config mockConfig = getConfig();
     String streamId = "streamId";
 
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
@@ -420,7 +420,7 @@ public class TestStreamApplicationDescriptorImpl {
 
   @Test(expected = IllegalStateException.class)
   public void testGetSameIntermediateStreamTwice() {
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class));
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, getConfig());
     streamAppDesc.getIntermediateStream("test-stream-1", mock(Serde.class), false);
     // should throw exception
     streamAppDesc.getIntermediateStream("test-stream-1", mock(Serde.class), false);
@@ -428,11 +428,12 @@ public class TestStreamApplicationDescriptorImpl {
 
   @Test
   public void testGetNextOpIdIncrementsId() {
-    Config mockConfig = mock(Config.class);
-    when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
-    when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
+    HashMap<String, String> configMap = new HashMap<>();
+    configMap.put(JobConfig.JOB_NAME(), "jobName");
+    configMap.put(JobConfig.JOB_ID(), "1234");
+    Config config = new MapConfig(configMap);
 
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, config);
     assertEquals("jobName-1234-merge-0", streamAppDesc.getNextOpId(OpCode.MERGE, null));
     assertEquals("jobName-1234-join-customName", streamAppDesc.getNextOpId(OpCode.JOIN, "customName"));
     assertEquals("jobName-1234-map-2", streamAppDesc.getNextOpId(OpCode.MAP, null));
@@ -440,22 +441,24 @@ public class TestStreamApplicationDescriptorImpl {
 
   @Test(expected = SamzaException.class)
   public void testGetNextOpIdRejectsDuplicates() {
-    Config mockConfig = mock(Config.class);
-    when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
-    when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
+    HashMap<String, String> configMap = new HashMap<>();
+    configMap.put(JobConfig.JOB_NAME(), "jobName");
+    configMap.put(JobConfig.JOB_ID(), "1234");
+    Config config = new MapConfig(configMap);
 
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, config);
     assertEquals("jobName-1234-join-customName", streamAppDesc.getNextOpId(OpCode.JOIN, "customName"));
     streamAppDesc.getNextOpId(OpCode.JOIN, "customName"); // should throw
   }
 
   @Test
   public void testOpIdValidation() {
-    Config mockConfig = mock(Config.class);
-    when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
-    when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
+    HashMap<String, String> configMap = new HashMap<>();
+    configMap.put(JobConfig.JOB_NAME(), "jobName");
+    configMap.put(JobConfig.JOB_ID(), "1234");
+    Config config = new MapConfig(configMap);
 
-    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, config);
 
     // null and empty userDefinedIDs should fall back to autogenerated IDs.
     try {
@@ -487,7 +490,7 @@ public class TestStreamApplicationDescriptorImpl {
 
   @Test
   public void testGetInputStreamPreservesInsertionOrder() {
-    Config mockConfig = mock(Config.class);
+    Config mockConfig = getConfig();
 
     String testStreamId1 = "test-stream-1";
     String testStreamId2 = "test-stream-2";
@@ -509,24 +512,25 @@ public class TestStreamApplicationDescriptorImpl {
 
   @Test
   public void testGetTable() throws Exception {
-    Config mockConfig = mock(Config.class);
+    Config mockConfig = getConfig();
 
     BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
     TableSpec testTableSpec = new TableSpec("t1", KVSerde.of(new NoOpSerde(), new NoOpSerde()), "", new HashMap<>());
     when(mockTableDescriptor.getTableSpec()).thenReturn(testTableSpec);
     when(mockTableDescriptor.getTableId()).thenReturn(testTableSpec.getId());
     when(mockTableDescriptor.getSerde()).thenReturn(testTableSpec.getSerde());
+    AtomicReference<TableImpl> table = new AtomicReference<>();
     StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
-        appDesc.getTable(mockTableDescriptor);
+        table.set((TableImpl) appDesc.getTable(mockTableDescriptor));
       }, mockConfig);
-    assertNotNull(streamAppDesc.getTables().get(testTableSpec.getId()));
+    assertEquals(testTableSpec.getId(), table.get().getTableSpec().getId());
   }
 
   @Test
   public void testApplicationContainerContextFactory() {
     ApplicationContainerContextFactory factory = mock(ApplicationContainerContextFactory.class);
     StreamApplication testApp = appDesc -> appDesc.withApplicationContainerContextFactory(factory);
-    StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
+    StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, getConfig());
     assertEquals(appSpec.getApplicationContainerContextFactory(), Optional.of(factory));
   }
 
@@ -534,7 +538,7 @@ public class TestStreamApplicationDescriptorImpl {
   public void testNoApplicationContainerContextFactory() {
     StreamApplication testApp = appDesc -> {
     };
-    StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
+    StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, getConfig());
     assertEquals(appSpec.getApplicationContainerContextFactory(), Optional.empty());
   }
 
@@ -542,7 +546,7 @@ public class TestStreamApplicationDescriptorImpl {
   public void testApplicationTaskContextFactory() {
     ApplicationTaskContextFactory factory = mock(ApplicationTaskContextFactory.class);
     StreamApplication testApp = appDesc -> appDesc.withApplicationTaskContextFactory(factory);
-    StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
+    StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, getConfig());
     assertEquals(appSpec.getApplicationTaskContextFactory(), Optional.of(factory));
   }
 
@@ -550,7 +554,7 @@ public class TestStreamApplicationDescriptorImpl {
   public void testNoApplicationTaskContextFactory() {
     StreamApplication testApp = appDesc -> {
     };
-    StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
+    StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, getConfig());
     assertEquals(appSpec.getApplicationTaskContextFactory(), Optional.empty());
   }
 
@@ -558,13 +562,13 @@ public class TestStreamApplicationDescriptorImpl {
   public void testProcessorLifecycleListenerFactory() {
     ProcessorLifecycleListenerFactory mockFactory = mock(ProcessorLifecycleListenerFactory.class);
     StreamApplication testApp = appSpec -> appSpec.withProcessorLifecycleListenerFactory(mockFactory);
-    StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
+    StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(testApp, getConfig());
     assertEquals(appDesc.getProcessorLifecycleListenerFactory(), mockFactory);
   }
 
   @Test(expected = IllegalStateException.class)
   public void testGetTableWithBadId() {
-    Config mockConfig = mock(Config.class);
+    Config mockConfig = getConfig();
     new StreamApplicationDescriptorImpl(appDesc -> {
         BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
         when(mockTableDescriptor.getTableId()).thenReturn("my.table");
@@ -572,6 +576,12 @@ public class TestStreamApplicationDescriptorImpl {
       }, mockConfig);
   }
 
+  private Config getConfig() {
+    HashMap<String, String> configMap = new HashMap<>();
+    configMap.put(JobConfig.JOB_NAME(), "test-job");
+    return new MapConfig(configMap);
+  }
+
   class MockExpandingSystemDescriptor extends SystemDescriptor<MockExpandingSystemDescriptor> implements ExpandingInputDescriptorProvider<Integer> {
     public MockExpandingSystemDescriptor(String systemName, StreamExpander expander) {
       super(systemName, "factory.class", null, expander);

http://git-wip-us.apache.org/repos/asf/samza/blob/ef359972/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index 6208206..f49958c 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -765,7 +765,7 @@ public class TestExecutionPlanner {
     when(taskAppDesc.getOutputStreamIds()).thenReturn(outputDescriptors.keySet());
     when(taskAppDesc.getTableDescriptors()).thenReturn(Collections.emptySet());
     when(taskAppDesc.getSystemDescriptors()).thenReturn(systemDescriptors);
-    when(taskAppDesc.getBroadcastStreams()).thenReturn(broadcastStreams);
+    when(taskAppDesc.getIntermediateBroadcastStreamIds()).thenReturn(broadcastStreams);
     doReturn(MockTaskApplication.class).when(taskAppDesc).getAppClass();
 
     Map<String, String> systemStreamConfigs = new HashMap<>();
@@ -796,7 +796,7 @@ public class TestExecutionPlanner {
     when(taskAppDesc.getOutputDescriptors()).thenReturn(new HashMap<>());
     when(taskAppDesc.getTableDescriptors()).thenReturn(new HashSet<>());
     when(taskAppDesc.getSystemDescriptors()).thenReturn(new HashSet<>());
-    when(taskAppDesc.getBroadcastStreams()).thenReturn(new HashSet<>());
+    when(taskAppDesc.getIntermediateBroadcastStreamIds()).thenReturn(new HashSet<>());
     doReturn(LegacyTaskApplication.class).when(taskAppDesc).getAppClass();
 
     Map<String, String> systemStreamConfigs = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/samza/blob/ef359972/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index 6bbd674..2da5858 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -86,134 +86,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TestOperatorImplGraph {
-  private void addOperatorRecursively(HashSet<OperatorImpl> s, OperatorImpl op) {
-    List<OperatorImpl> operators = new ArrayList<>();
-    operators.add(op);
-    while (!operators.isEmpty()) {
-      OperatorImpl opImpl = operators.remove(0);
-      s.add(opImpl);
-      if (!opImpl.registeredOperators.isEmpty()) {
-        operators.addAll(opImpl.registeredOperators);
-      }
-    }
-  }
-
-  static class TestMapFunction<M, OM> extends BaseTestFunction implements MapFunction<M, OM> {
-    final Function<M, OM> mapFn;
-
-    public TestMapFunction(String opId, Function<M, OM> mapFn) {
-      super(opId);
-      this.mapFn = mapFn;
-    }
-
-    @Override
-    public OM apply(M message) {
-      return this.mapFn.apply(message);
-    }
-  }
-
-  static class TestJoinFunction<K, M, JM, RM> extends BaseTestFunction implements JoinFunction<K, M, JM, RM> {
-    final BiFunction<M, JM, RM> joiner;
-    final Function<M, K> firstKeyFn;
-    final Function<JM, K> secondKeyFn;
-    final Collection<RM> joinResults = new HashSet<>();
-
-    public TestJoinFunction(String opId, BiFunction<M, JM, RM> joiner, Function<M, K> firstKeyFn, Function<JM, K> secondKeyFn) {
-      super(opId);
-      this.joiner = joiner;
-      this.firstKeyFn = firstKeyFn;
-      this.secondKeyFn = secondKeyFn;
-    }
-
-    @Override
-    public RM apply(M message, JM otherMessage) {
-      RM result = this.joiner.apply(message, otherMessage);
-      this.joinResults.add(result);
-      return result;
-    }
-
-    @Override
-    public K getFirstKey(M message) {
-      return this.firstKeyFn.apply(message);
-    }
-
-    @Override
-    public K getSecondKey(JM message) {
-      return this.secondKeyFn.apply(message);
-    }
-  }
-
-  static abstract class BaseTestFunction implements InitableFunction, ClosableFunction, Serializable {
-
-    static Map<TaskName, Map<String, BaseTestFunction>> perTaskFunctionMap = new HashMap<>();
-    static Map<TaskName, List<String>> perTaskInitList = new HashMap<>();
-    static Map<TaskName, List<String>> perTaskCloseList = new HashMap<>();
-    int numInitCalled = 0;
-    int numCloseCalled = 0;
-    TaskName taskName = null;
-    final String opId;
-
-    public BaseTestFunction(String opId) {
-      this.opId = opId;
-    }
-
-    static public void reset() {
-      perTaskFunctionMap.clear();
-      perTaskCloseList.clear();
-      perTaskInitList.clear();
-    }
-
-    static public BaseTestFunction getInstanceByTaskName(TaskName taskName, String opId) {
-      return perTaskFunctionMap.get(taskName).get(opId);
-    }
-
-    static public List<String> getInitListByTaskName(TaskName taskName) {
-      return perTaskInitList.get(taskName);
-    }
-
-    static public List<String> getCloseListByTaskName(TaskName taskName) {
-      return perTaskCloseList.get(taskName);
-    }
-
-    @Override
-    public void close() {
-      if (this.taskName == null) {
-        throw new IllegalStateException("Close called before init");
-      }
-      if (perTaskFunctionMap.get(this.taskName) == null || !perTaskFunctionMap.get(this.taskName).containsKey(opId)) {
-        throw new IllegalStateException("Close called before init");
-      }
-
-      if (perTaskCloseList.get(this.taskName) == null) {
-        perTaskCloseList.put(taskName, new ArrayList<String>() { { this.add(opId); } });
-      } else {
-        perTaskCloseList.get(taskName).add(opId);
-      }
-
-      this.numCloseCalled++;
-    }
-
-    @Override
-    public void init(Context context) {
-      TaskName taskName = context.getTaskContext().getTaskModel().getTaskName();
-      if (perTaskFunctionMap.get(taskName) == null) {
-        perTaskFunctionMap.put(taskName, new HashMap<String, BaseTestFunction>() { { this.put(opId, BaseTestFunction.this); } });
-      } else {
-        if (perTaskFunctionMap.get(taskName).containsKey(opId)) {
-          throw new IllegalStateException(String.format("Multiple init called for op %s in the same task instance %s", opId, this.taskName.getTaskName()));
-        }
-        perTaskFunctionMap.get(taskName).put(opId, this);
-      }
-      if (perTaskInitList.get(taskName) == null) {
-        perTaskInitList.put(taskName, new ArrayList<String>() { { this.add(opId); } });
-      } else {
-        perTaskInitList.get(taskName).add(opId);
-      }
-      this.taskName = taskName;
-      this.numInitCalled++;
-    }
-  }
-
   private Context context;
 
   @Before
@@ -357,6 +229,8 @@ public class TestOperatorImplGraph {
     String inputSystem = "input-system";
     String inputPhysicalName = "input-stream";
     HashMap<String, String> configMap = new HashMap<>();
+    configMap.put(JobConfig.JOB_NAME(), "test-job");
+    configMap.put(JobConfig.JOB_ID(), "1");
     StreamTestUtils.addStreamConfigs(configMap, inputStreamId, inputSystem, inputPhysicalName);
     Config config = new MapConfig(configMap);
     when(this.context.getJobContext().getConfig()).thenReturn(config);
@@ -391,7 +265,7 @@ public class TestOperatorImplGraph {
         MessageStream<Object> stream2 = inputStream.map(mock(MapFunction.class));
         stream1.merge(Collections.singleton(stream2))
             .map(new TestMapFunction<Object, Object>("test-map-1", (Function & Serializable) m -> m));
-      }, mock(Config.class));
+      }, getConfig());
 
     TaskName mockTaskName = mock(TaskName.class);
     TaskModel taskModel = mock(TaskModel.class);
@@ -495,7 +369,6 @@ public class TestOperatorImplGraph {
     String inputStreamId1 = "input1";
     String inputStreamId2 = "input2";
     String inputSystem = "input-system";
-    Config mockConfig = mock(Config.class);
 
     TaskName mockTaskName = mock(TaskName.class);
     TaskModel taskModel = mock(TaskModel.class);
@@ -515,7 +388,7 @@ public class TestOperatorImplGraph {
 
         inputStream2.map(new TestMapFunction<Object, Object>("3", mapFn))
             .map(new TestMapFunction<Object, Object>("4", mapFn));
-      }, mockConfig);
+      }, getConfig());
 
     OperatorImplGraph opImplGraph = new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), this.context, SystemClock.instance());
 
@@ -693,4 +566,138 @@ public class TestOperatorImplGraph {
     assertTrue(counts.get(int1) == 3);
     assertTrue(counts.get(int2) == 2);
   }
+
+  private void addOperatorRecursively(HashSet<OperatorImpl> s, OperatorImpl op) {
+    List<OperatorImpl> operators = new ArrayList<>();
+    operators.add(op);
+    while (!operators.isEmpty()) {
+      OperatorImpl opImpl = operators.remove(0);
+      s.add(opImpl);
+      if (!opImpl.registeredOperators.isEmpty()) {
+        operators.addAll(opImpl.registeredOperators);
+      }
+    }
+  }
+
+  private Config getConfig() {
+    HashMap<String, String> configMap = new HashMap<>();
+    configMap.put(JobConfig.JOB_NAME(), "test-job");
+    configMap.put(JobConfig.JOB_ID(), "1");
+    return new MapConfig(configMap);
+  }
+
+  private static class TestMapFunction<M, OM> extends BaseTestFunction implements MapFunction<M, OM> {
+    final Function<M, OM> mapFn;
+
+    public TestMapFunction(String opId, Function<M, OM> mapFn) {
+      super(opId);
+      this.mapFn = mapFn;
+    }
+
+    @Override
+    public OM apply(M message) {
+      return this.mapFn.apply(message);
+    }
+  }
+
+  private static class TestJoinFunction<K, M, JM, RM> extends BaseTestFunction implements JoinFunction<K, M, JM, RM> {
+    final BiFunction<M, JM, RM> joiner;
+    final Function<M, K> firstKeyFn;
+    final Function<JM, K> secondKeyFn;
+    final Collection<RM> joinResults = new HashSet<>();
+
+    public TestJoinFunction(String opId, BiFunction<M, JM, RM> joiner, Function<M, K> firstKeyFn, Function<JM, K> secondKeyFn) {
+      super(opId);
+      this.joiner = joiner;
+      this.firstKeyFn = firstKeyFn;
+      this.secondKeyFn = secondKeyFn;
+    }
+
+    @Override
+    public RM apply(M message, JM otherMessage) {
+      RM result = this.joiner.apply(message, otherMessage);
+      this.joinResults.add(result);
+      return result;
+    }
+
+    @Override
+    public K getFirstKey(M message) {
+      return this.firstKeyFn.apply(message);
+    }
+
+    @Override
+    public K getSecondKey(JM message) {
+      return this.secondKeyFn.apply(message);
+    }
+  }
+
+  private static abstract class BaseTestFunction implements InitableFunction, ClosableFunction, Serializable {
+    static Map<TaskName, Map<String, BaseTestFunction>> perTaskFunctionMap = new HashMap<>();
+    static Map<TaskName, List<String>> perTaskInitList = new HashMap<>();
+    static Map<TaskName, List<String>> perTaskCloseList = new HashMap<>();
+    int numInitCalled = 0;
+    int numCloseCalled = 0;
+    TaskName taskName = null;
+    final String opId;
+
+    public BaseTestFunction(String opId) {
+      this.opId = opId;
+    }
+
+    static public void reset() {
+      perTaskFunctionMap.clear();
+      perTaskCloseList.clear();
+      perTaskInitList.clear();
+    }
+
+    static public BaseTestFunction getInstanceByTaskName(TaskName taskName, String opId) {
+      return perTaskFunctionMap.get(taskName).get(opId);
+    }
+
+    static public List<String> getInitListByTaskName(TaskName taskName) {
+      return perTaskInitList.get(taskName);
+    }
+
+    static public List<String> getCloseListByTaskName(TaskName taskName) {
+      return perTaskCloseList.get(taskName);
+    }
+
+    @Override
+    public void close() {
+      if (this.taskName == null) {
+        throw new IllegalStateException("Close called before init");
+      }
+      if (perTaskFunctionMap.get(this.taskName) == null || !perTaskFunctionMap.get(this.taskName).containsKey(opId)) {
+        throw new IllegalStateException("Close called before init");
+      }
+
+      if (perTaskCloseList.get(this.taskName) == null) {
+        perTaskCloseList.put(taskName, new ArrayList<String>() { { this.add(opId); } });
+      } else {
+        perTaskCloseList.get(taskName).add(opId);
+      }
+
+      this.numCloseCalled++;
+    }
+
+    @Override
+    public void init(Context context) {
+      TaskName taskName = context.getTaskContext().getTaskModel().getTaskName();
+      if (perTaskFunctionMap.get(taskName) == null) {
+        perTaskFunctionMap.put(taskName, new HashMap<String, BaseTestFunction>() { { this.put(opId, BaseTestFunction.this); } });
+      } else {
+        if (perTaskFunctionMap.get(taskName).containsKey(opId)) {
+          throw new IllegalStateException(String.format("Multiple init called for op %s in the same task instance %s", opId, this.taskName.getTaskName()));
+        }
+        perTaskFunctionMap.get(taskName).put(opId, this);
+      }
+      if (perTaskInitList.get(taskName) == null) {
+        perTaskInitList.put(taskName, new ArrayList<String>() { { this.add(opId); } });
+      } else {
+        perTaskInitList.get(taskName).add(opId);
+      }
+      this.taskName = taskName;
+      this.numInitCalled++;
+    }
+  }
 }