You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2018/09/07 06:36:12 UTC

[6/9] samza git commit: SAMZA-1789: unify ApplicationDescriptor and ApplicationRunner for high- and low-level APIs in YARN and standalone environment

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java b/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java
new file mode 100644
index 0000000..db85e33
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java
@@ -0,0 +1,584 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.operators.BaseTableDescriptor;
+import org.apache.samza.operators.ContextManager;
+import org.apache.samza.operators.data.TestMessageEnvelope;
+import org.apache.samza.operators.descriptors.GenericInputDescriptor;
+import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
+import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
+import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
+import org.apache.samza.operators.descriptors.base.system.ExpandingInputDescriptorProvider;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.operators.descriptors.base.system.TransformingInputDescriptorProvider;
+import org.apache.samza.operators.functions.InputTransformer;
+import org.apache.samza.operators.functions.StreamExpander;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec.OpCode;
+import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
+import org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.table.TableSpec;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit test for {@link StreamApplicationDescriptorImpl}
+ */
+public class TestStreamApplicationDescriptorImpl {
+
+  @Test
+  public void testConstructor() {
+    StreamApplication mockApp = mock(StreamApplication.class);
+    Config mockConfig = mock(Config.class);
+    StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(mockApp, mockConfig);
+    verify(mockApp).describe(appDesc);
+    assertEquals(mockConfig, appDesc.config);
+  }
+
+  @Test
+  public void testGetInputStreamWithValueSerde() {
+
+    String streamId = "test-stream-1";
+    Serde mockValueSerde = mock(Serde.class);
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+    GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mockValueSerde);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
+        appDesc.getInputStream(isd);
+      }, mock(Config.class));
+
+    InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(streamId);
+    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+    assertEquals(streamId, inputOpSpec.getStreamId());
+    assertEquals(isd, streamAppDesc.getInputDescriptors().get(streamId));
+    assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
+    assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
+  }
+
+  @Test
+  public void testGetInputStreamWithKeyValueSerde() {
+
+    String streamId = "test-stream-1";
+    KVSerde mockKVSerde = mock(KVSerde.class);
+    Serde mockKeySerde = mock(Serde.class);
+    Serde mockValueSerde = mock(Serde.class);
+    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+    GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mockKVSerde);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
+        appDesc.getInputStream(isd);
+      }, mock(Config.class));
+
+    InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(streamId);
+    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+    assertEquals(streamId, inputOpSpec.getStreamId());
+    assertEquals(isd, streamAppDesc.getInputDescriptors().get(streamId));
+    assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
+    assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetInputStreamWithNullSerde() {
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+    GenericInputDescriptor isd = sd.getInputDescriptor("mockStreamId", null);
+    new StreamApplicationDescriptorImpl(appDesc -> {
+        appDesc.getInputStream(isd);
+      }, mock(Config.class));
+  }
+
+  @Test
+  public void testGetInputStreamWithTransformFunction() {
+    String streamId = "test-stream-1";
+    Serde mockValueSerde = mock(Serde.class);
+    InputTransformer transformer = ime -> ime;
+    MockTransformingSystemDescriptor sd = new MockTransformingSystemDescriptor("mockSystem", transformer);
+    MockInputDescriptor isd = sd.getInputDescriptor(streamId, mockValueSerde);
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
+        appDesc.getInputStream(isd);
+      }, mock(Config.class));
+
+    InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(streamId);
+    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+    assertEquals(streamId, inputOpSpec.getStreamId());
+    assertEquals(isd, streamAppDesc.getInputDescriptors().get(streamId));
+    assertEquals(transformer, inputOpSpec.getTransformer());
+  }
+
+  @Test
+  public void testGetInputStreamWithExpandingSystem() {
+    String streamId = "test-stream-1";
+    String expandedStreamId = "expanded-stream";
+    AtomicInteger expandCallCount = new AtomicInteger();
+    StreamExpander expander = (sg, isd) -> {
+      expandCallCount.incrementAndGet();
+      InputDescriptor expandedISD =
+          new GenericSystemDescriptor("expanded-system", "mockFactoryClass")
+              .getInputDescriptor(expandedStreamId, new IntegerSerde());
+
+      return sg.getInputStream(expandedISD);
+    };
+    MockExpandingSystemDescriptor sd = new MockExpandingSystemDescriptor("mock-system", expander);
+    MockInputDescriptor isd = sd.getInputDescriptor(streamId, new IntegerSerde());
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
+        appDesc.getInputStream(isd);
+      }, mock(Config.class));
+
+    InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(expandedStreamId);
+    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+    assertEquals(1, expandCallCount.get());
+    assertFalse(streamAppDesc.getInputOperators().containsKey(streamId));
+    assertFalse(streamAppDesc.getInputDescriptors().containsKey(streamId));
+    assertTrue(streamAppDesc.getInputDescriptors().containsKey(expandedStreamId));
+    assertEquals(expandedStreamId, inputOpSpec.getStreamId());
+  }
+
+  @Test
+  public void testGetInputStreamWithRelaxedTypes() {
+    String streamId = "test-stream-1";
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+    GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mock(Serde.class));
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
+        appDesc.getInputStream(isd);
+      }, mock(Config.class));
+
+    InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(streamId);
+    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+    assertEquals(streamId, inputOpSpec.getStreamId());
+    assertEquals(isd, streamAppDesc.getInputDescriptors().get(streamId));
+  }
+
+  @Test
+  public void testMultipleGetInputStreams() {
+    String streamId1 = "test-stream-1";
+    String streamId2 = "test-stream-2";
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+    GenericInputDescriptor isd1 = sd.getInputDescriptor(streamId1, mock(Serde.class));
+    GenericInputDescriptor isd2 = sd.getInputDescriptor(streamId2, mock(Serde.class));
+
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
+        appDesc.getInputStream(isd1);
+        appDesc.getInputStream(isd2);
+      }, mock(Config.class));
+
+    InputOperatorSpec inputOpSpec1 = streamAppDesc.getInputOperators().get(streamId1);
+    InputOperatorSpec inputOpSpec2 = streamAppDesc.getInputOperators().get(streamId2);
+
+    assertEquals(2, streamAppDesc.getInputOperators().size());
+    assertEquals(streamId1, inputOpSpec1.getStreamId());
+    assertEquals(streamId2, inputOpSpec2.getStreamId());
+    assertEquals(2, streamAppDesc.getInputDescriptors().size());
+    assertEquals(isd1, streamAppDesc.getInputDescriptors().get(streamId1));
+    assertEquals(isd2, streamAppDesc.getInputDescriptors().get(streamId2));
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testGetSameInputStreamTwice() {
+    String streamId = "test-stream-1";
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+    GenericInputDescriptor isd1 = sd.getInputDescriptor(streamId, mock(Serde.class));
+    GenericInputDescriptor isd2 = sd.getInputDescriptor(streamId, mock(Serde.class));
+    new StreamApplicationDescriptorImpl(appDesc -> {
+        appDesc.getInputStream(isd1);
+        // should throw exception
+        appDesc.getInputStream(isd2);
+      }, mock(Config.class));
+  }
+
+  @Test
+  public void testMultipleSystemDescriptorForSameSystemName() {
+    GenericSystemDescriptor sd1 = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+    GenericSystemDescriptor sd2 = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+    GenericInputDescriptor isd1 = sd1.getInputDescriptor("test-stream-1", mock(Serde.class));
+    GenericInputDescriptor isd2 = sd2.getInputDescriptor("test-stream-2", mock(Serde.class));
+    GenericOutputDescriptor osd1 = sd2.getOutputDescriptor("test-stream-3", mock(Serde.class));
+
+    new StreamApplicationDescriptorImpl(appDesc -> {
+        appDesc.getInputStream(isd1);
+        try {
+          appDesc.getInputStream(isd2);
+          fail("Adding input stream with the same system name but different SystemDescriptor should have failed");
+        } catch (IllegalStateException e) { }
+
+        try {
+          appDesc.getOutputStream(osd1);
+          fail("adding output stream with the same system name but different SystemDescriptor should have failed");
+        } catch (IllegalStateException e) { }
+      }, mock(Config.class));
+
+    new StreamApplicationDescriptorImpl(appDesc -> {
+        appDesc.withDefaultSystem(sd2);
+        try {
+          appDesc.getInputStream(isd1);
+          fail("Adding input stream with the same system name as the default system but different SystemDescriptor should have failed");
+        } catch (IllegalStateException e) { }
+      }, mock(Config.class));
+  }
+
+  @Test
+  public void testGetOutputStreamWithKeyValueSerde() {
+    String streamId = "test-stream-1";
+    KVSerde mockKVSerde = mock(KVSerde.class);
+    Serde mockKeySerde = mock(Serde.class);
+    Serde mockValueSerde = mock(Serde.class);
+    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+    GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, mockKVSerde);
+
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
+        appDesc.getOutputStream(osd);
+      }, mock(Config.class));
+
+    OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = streamAppDesc.getOutputStreams().get(streamId);
+    assertEquals(streamId, outputStreamImpl.getStreamId());
+    assertEquals(osd, streamAppDesc.getOutputDescriptors().get(streamId));
+    assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
+    assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetOutputStreamWithNullSerde() {
+    String streamId = "test-stream-1";
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+    GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, null);
+    new StreamApplicationDescriptorImpl(appDesc -> {
+        appDesc.getOutputStream(osd);
+      }, mock(Config.class));
+  }
+
+  @Test
+  public void testGetOutputStreamWithValueSerde() {
+    String streamId = "test-stream-1";
+    Serde mockValueSerde = mock(Serde.class);
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+    GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, mockValueSerde);
+
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
+        appDesc.getOutputStream(osd);
+      }, mock(Config.class));
+
+    OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = streamAppDesc.getOutputStreams().get(streamId);
+    assertEquals(streamId, outputStreamImpl.getStreamId());
+    assertEquals(osd, streamAppDesc.getOutputDescriptors().get(streamId));
+    assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
+    assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testSetDefaultSystemDescriptorAfterGettingInputStream() {
+    String streamId = "test-stream-1";
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+    GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mock(Serde.class));
+
+    new StreamApplicationDescriptorImpl(appDesc -> {
+        appDesc.getInputStream(isd);
+        appDesc.withDefaultSystem(sd); // should throw exception
+      }, mock(Config.class));
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testSetDefaultSystemDescriptorAfterGettingOutputStream() {
+    String streamId = "test-stream-1";
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+    GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, mock(Serde.class));
+    new StreamApplicationDescriptorImpl(appDesc -> {
+        appDesc.getOutputStream(osd);
+        appDesc.withDefaultSystem(sd); // should throw exception
+      }, mock(Config.class));
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testSetDefaultSystemDescriptorAfterGettingIntermediateStream() {
+    String streamId = "test-stream-1";
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class));
+    streamAppDesc.getIntermediateStream(streamId, mock(Serde.class), false);
+    streamAppDesc.withDefaultSystem(mock(SystemDescriptor.class)); // should throw exception
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testGetSameOutputStreamTwice() {
+    String streamId = "test-stream-1";
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+    GenericOutputDescriptor osd1 = sd.getOutputDescriptor(streamId, mock(Serde.class));
+    GenericOutputDescriptor osd2 = sd.getOutputDescriptor(streamId, mock(Serde.class));
+    new StreamApplicationDescriptorImpl(appDesc -> {
+        appDesc.getOutputStream(osd1);
+        appDesc.getOutputStream(osd2); // should throw exception
+      }, mock(Config.class));
+  }
+
+  @Test
+  public void testGetIntermediateStreamWithValueSerde() {
+    String streamId = "stream-1";
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class));
+
+    Serde mockValueSerde = mock(Serde.class);
+    IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+        streamAppDesc.getIntermediateStream(streamId, mockValueSerde, false);
+
+    assertEquals(streamAppDesc.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec());
+    assertEquals(streamAppDesc.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream());
+    assertEquals(streamId, intermediateStreamImpl.getStreamId());
+    assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
+    assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
+    assertTrue(((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
+    assertEquals(mockValueSerde, ((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
+  }
+
+  @Test
+  public void testGetIntermediateStreamWithKeyValueSerde() {
+    String streamId = "streamId";
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class));
+
+    KVSerde mockKVSerde = mock(KVSerde.class);
+    Serde mockKeySerde = mock(Serde.class);
+    Serde mockValueSerde = mock(Serde.class);
+    doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+    doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+    IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+        streamAppDesc.getIntermediateStream(streamId, mockKVSerde, false);
+
+    assertEquals(streamAppDesc.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec());
+    assertEquals(streamAppDesc.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream());
+    assertEquals(streamId, intermediateStreamImpl.getStreamId());
+    assertEquals(mockKeySerde, intermediateStreamImpl.getOutputStream().getKeySerde());
+    assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
+    assertEquals(mockKeySerde, ((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde());
+    assertEquals(mockValueSerde, ((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
+  }
+
+  @Test
+  public void testGetIntermediateStreamWithDefaultSystemDescriptor() {
+    Config mockConfig = mock(Config.class);
+    String streamId = "streamId";
+
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("mock-system", "mock-system-factory");
+    streamAppDesc.withDefaultSystem(sd);
+    IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+        streamAppDesc.getIntermediateStream(streamId, mock(Serde.class), false);
+
+    assertEquals(streamAppDesc.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec());
+    assertEquals(streamAppDesc.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream());
+    assertEquals(streamId, intermediateStreamImpl.getStreamId());
+  }
+
+  @Test
+  public void testGetIntermediateStreamWithNoSerde() {
+    Config mockConfig = mock(Config.class);
+    String streamId = "streamId";
+
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
+    IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+        streamAppDesc.getIntermediateStream(streamId, null, false);
+
+    assertEquals(streamAppDesc.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec());
+    assertEquals(streamAppDesc.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream());
+    assertEquals(streamId, intermediateStreamImpl.getStreamId());
+    assertNull(intermediateStreamImpl.getOutputStream().getKeySerde());
+    assertNull(intermediateStreamImpl.getOutputStream().getValueSerde());
+    assertNull(((InputOperatorSpec) (OperatorSpec)  intermediateStreamImpl.getOperatorSpec()).getKeySerde());
+    assertNull(((InputOperatorSpec) (OperatorSpec)  intermediateStreamImpl.getOperatorSpec()).getValueSerde());
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testGetSameIntermediateStreamTwice() {
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class));
+    streamAppDesc.getIntermediateStream("test-stream-1", mock(Serde.class), false);
+    // should throw exception
+    streamAppDesc.getIntermediateStream("test-stream-1", mock(Serde.class), false);
+  }
+
+  @Test
+  public void testGetNextOpIdIncrementsId() {
+    Config mockConfig = mock(Config.class);
+    when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
+    when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
+
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
+    assertEquals("jobName-1234-merge-0", streamAppDesc.getNextOpId(OpCode.MERGE, null));
+    assertEquals("jobName-1234-join-customName", streamAppDesc.getNextOpId(OpCode.JOIN, "customName"));
+    assertEquals("jobName-1234-map-2", streamAppDesc.getNextOpId(OpCode.MAP, null));
+  }
+
+  @Test(expected = SamzaException.class)
+  public void testGetNextOpIdRejectsDuplicates() {
+    Config mockConfig = mock(Config.class);
+    when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
+    when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
+
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
+    assertEquals("jobName-1234-join-customName", streamAppDesc.getNextOpId(OpCode.JOIN, "customName"));
+    streamAppDesc.getNextOpId(OpCode.JOIN, "customName"); // should throw
+  }
+
+  @Test
+  public void testOpIdValidation() {
+    Config mockConfig = mock(Config.class);
+    when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
+    when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
+
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
+
+    // null and empty userDefinedIDs should fall back to autogenerated IDs.
+    try {
+      streamAppDesc.getNextOpId(OpCode.FILTER, null);
+      streamAppDesc.getNextOpId(OpCode.FILTER, "");
+      streamAppDesc.getNextOpId(OpCode.FILTER, " ");
+      streamAppDesc.getNextOpId(OpCode.FILTER, "\t");
+    } catch (SamzaException e) {
+      fail("Received an error with a null or empty operator ID instead of defaulting to auto-generated ID.");
+    }
+
+    List<String> validOpIds = ImmutableList.of("op_id", "op-id", "1000", "op_1", "OP_ID");
+    for (String validOpId: validOpIds) {
+      try {
+        streamAppDesc.getNextOpId(OpCode.FILTER, validOpId);
+      } catch (Exception e) {
+        fail("Received an exception with a valid operator ID: " + validOpId);
+      }
+    }
+
+    List<String> invalidOpIds = ImmutableList.of("op id", "op#id");
+    for (String invalidOpId: invalidOpIds) {
+      try {
+        streamAppDesc.getNextOpId(OpCode.FILTER, invalidOpId);
+        fail("Did not receive an exception with an invalid operator ID: " + invalidOpId);
+      } catch (SamzaException e) { }
+    }
+  }
+
+  @Test
+  public void testGetInputStreamPreservesInsertionOrder() {
+    Config mockConfig = mock(Config.class);
+
+    String testStreamId1 = "test-stream-1";
+    String testStreamId2 = "test-stream-2";
+    String testStreamId3 = "test-stream-3";
+
+    GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
+        appDesc.getInputStream(sd.getInputDescriptor(testStreamId1, mock(Serde.class)));
+        appDesc.getInputStream(sd.getInputDescriptor(testStreamId2, mock(Serde.class)));
+        appDesc.getInputStream(sd.getInputDescriptor(testStreamId3, mock(Serde.class)));
+      }, mockConfig);
+
+    List<InputOperatorSpec> inputSpecs = new ArrayList<>(streamAppDesc.getInputOperators().values());
+    assertEquals(inputSpecs.size(), 3);
+    assertEquals(inputSpecs.get(0).getStreamId(), testStreamId1);
+    assertEquals(inputSpecs.get(1).getStreamId(), testStreamId2);
+    assertEquals(inputSpecs.get(2).getStreamId(), testStreamId3);
+  }
+
+  @Test
+  public void testGetTable() throws Exception {
+    Config mockConfig = mock(Config.class);
+
+    BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
+    TableSpec testTableSpec = new TableSpec("t1", KVSerde.of(new NoOpSerde(), new NoOpSerde()), "", new HashMap<>());
+    when(mockTableDescriptor.getTableSpec()).thenReturn(testTableSpec);
+    when(mockTableDescriptor.getTableId()).thenReturn(testTableSpec.getId());
+    StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
+        appDesc.getTable(mockTableDescriptor);
+      }, mockConfig);
+    assertNotNull(streamAppDesc.getTables().get(testTableSpec));
+  }
+
+  @Test
+  public void testContextManager() {
+    ContextManager cntxMan = mock(ContextManager.class);
+    StreamApplication testApp = appDesc -> appDesc.withContextManager(cntxMan);
+    StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
+    assertEquals(appSpec.getContextManager(), cntxMan);
+  }
+
+  @Test
+  public void testProcessorLifecycleListenerFactory() {
+    ProcessorLifecycleListenerFactory mockFactory = mock(ProcessorLifecycleListenerFactory.class);
+    StreamApplication testApp = appSpec -> appSpec.withProcessorLifecycleListenerFactory(mockFactory);
+    StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
+    assertEquals(appDesc.getProcessorLifecycleListenerFactory(), mockFactory);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testGetTableWithBadId() {
+    Config mockConfig = mock(Config.class);
+    new StreamApplicationDescriptorImpl(appDesc -> {
+        BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
+        when(mockTableDescriptor.getTableId()).thenReturn("my.table");
+        appDesc.getTable(mockTableDescriptor);
+      }, mockConfig);
+  }
+
+  class MockExpandingSystemDescriptor extends SystemDescriptor<MockExpandingSystemDescriptor> implements ExpandingInputDescriptorProvider<Integer> {
+    public MockExpandingSystemDescriptor(String systemName, StreamExpander expander) {
+      super(systemName, "factory.class", null, expander);
+    }
+
+    @Override
+    public MockInputDescriptor<Integer> getInputDescriptor(String streamId, Serde serde) {
+      return new MockInputDescriptor<>(streamId, this, serde);
+    }
+  }
+
+  class MockTransformingSystemDescriptor extends SystemDescriptor<MockTransformingSystemDescriptor> implements TransformingInputDescriptorProvider<Integer> {
+    public MockTransformingSystemDescriptor(String systemName, InputTransformer transformer) {
+      super(systemName, "factory.class", transformer, null);
+    }
+
+    @Override
+    public MockInputDescriptor<Integer> getInputDescriptor(String streamId, Serde serde) {
+      return new MockInputDescriptor<>(streamId, this, serde);
+    }
+  }
+
+  public class MockInputDescriptor<StreamMessageType> extends InputDescriptor<StreamMessageType, MockInputDescriptor<StreamMessageType>> {
+    MockInputDescriptor(String streamId, SystemDescriptor systemDescriptor, Serde serde) {
+      super(streamId, serde, systemDescriptor, null);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java b/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java
new file mode 100644
index 0000000..9418c1f
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.ContextManager;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
+import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
+import org.apache.samza.task.TaskFactory;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Unit test for {@link TaskApplicationDescriptorImpl}
+ */
+public class TestTaskApplicationDescriptorImpl {
+
+  private Config config = mock(Config.class);
+  private String defaultSystemName = "test-system";
+  private SystemDescriptor defaultSystemDescriptor = mock(SystemDescriptor.class);
+  private List<InputDescriptor> mockInputs = new ArrayList<InputDescriptor>() { {
+      InputDescriptor mock1 = mock(InputDescriptor.class);
+      InputDescriptor mock2 = mock(InputDescriptor.class);
+      when(mock1.getStreamId()).thenReturn("test-input1");
+      when(mock2.getStreamId()).thenReturn("test-input2");
+      this.add(mock1);
+      this.add(mock2);
+    } };
+  private List<OutputDescriptor> mockOutputs = new ArrayList<OutputDescriptor>() { {
+      OutputDescriptor mock1 = mock(OutputDescriptor.class);
+      OutputDescriptor mock2 = mock(OutputDescriptor.class);
+      when(mock1.getStreamId()).thenReturn("test-output1");
+      when(mock2.getStreamId()).thenReturn("test-output2");
+      this.add(mock1);
+      this.add(mock2);
+    } };
+  private Set<TableDescriptor> mockTables = new HashSet<TableDescriptor>() { {
+      TableDescriptor mock1 = mock(TableDescriptor.class);
+      TableDescriptor mock2 = mock(TableDescriptor.class);
+      when(mock1.getTableId()).thenReturn("test-table1");
+      when(mock2.getTableId()).thenReturn("test-table2");
+      this.add(mock1);
+      this.add(mock2);
+    } };
+
+  @Before
+  public void setUp() {
+    when(defaultSystemDescriptor.getSystemName()).thenReturn(defaultSystemName);
+    mockInputs.forEach(isd -> when(isd.getSystemDescriptor()).thenReturn(defaultSystemDescriptor));
+    mockOutputs.forEach(osd -> when(osd.getSystemDescriptor()).thenReturn(defaultSystemDescriptor));
+  }
+
+  @Test
+  public void testConstructor() {
+    TaskApplication mockApp = mock(TaskApplication.class);
+    TaskApplicationDescriptorImpl appDesc = new TaskApplicationDescriptorImpl(mockApp, config);
+    verify(mockApp).describe(appDesc);
+    assertEquals(config, appDesc.config);
+  }
+
+  @Test
+  public void testAddInputStreams() {
+    TaskApplication testApp = appDesc -> {
+      mockInputs.forEach(appDesc::addInputStream);
+    };
+    TaskApplicationDescriptorImpl appDesc = new TaskApplicationDescriptorImpl(testApp, config);
+    assertEquals(mockInputs.toArray(), appDesc.getInputDescriptors().values().toArray());
+  }
+
+  @Test
+  public void testAddOutputStreams() {
+    TaskApplication testApp = appDesc -> {
+      mockOutputs.forEach(appDesc::addOutputStream);
+    };
+    TaskApplicationDescriptorImpl appDesc = new TaskApplicationDescriptorImpl(testApp, config);
+    assertEquals(mockOutputs.toArray(), appDesc.getOutputDescriptors().values().toArray());
+  }
+
+  @Test
+  public void testAddTables() {
+    TaskApplication testApp = appDesc -> {
+      mockTables.forEach(appDesc::addTable);
+    };
+    TaskApplicationDescriptorImpl appDesc = new TaskApplicationDescriptorImpl(testApp, config);
+    assertEquals(mockTables, appDesc.getTableDescriptors());
+  }
+
+  @Test
+  public void testSetTaskFactory() {
+    TaskFactory mockTf = mock(TaskFactory.class);
+    TaskApplication testApp = appDesc -> appDesc.setTaskFactory(mockTf);
+    TaskApplicationDescriptorImpl appDesc = new TaskApplicationDescriptorImpl(testApp, config);
+    assertEquals(appDesc.getTaskFactory(), mockTf);
+  }
+
+  @Test
+  public void testContextManager() {
+    ContextManager cntxMan = mock(ContextManager.class);
+    TaskApplication testApp = appDesc -> {
+      appDesc.withContextManager(cntxMan);
+    };
+    TaskApplicationDescriptorImpl appDesc = new TaskApplicationDescriptorImpl(testApp, config);
+    assertEquals(appDesc.getContextManager(), cntxMan);
+  }
+
+  @Test
+  public void testProcessorLifecycleListener() {
+    ProcessorLifecycleListenerFactory mockFactory = mock(ProcessorLifecycleListenerFactory.class);
+    TaskApplication testApp = appDesc -> {
+      appDesc.withProcessorLifecycleListenerFactory(mockFactory);
+    };
+    TaskApplicationDescriptorImpl appDesc = new TaskApplicationDescriptorImpl(testApp, config);
+    assertEquals(appDesc.getProcessorLifecycleListenerFactory(), mockFactory);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index 9912d8b..61cf6c5 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -28,13 +28,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.samza.Partition;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.descriptors.GenericInputDescriptor;
 import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
@@ -107,24 +107,24 @@ public class TestExecutionPlanner {
     };
   }
 
-  private StreamGraphSpec createSimpleGraph() {
+  private StreamApplicationDescriptorImpl createSimpleGraph() {
     /**
      * a simple graph of partitionBy and map
      *
      * input1 -> partitionBy -> map -> output1
      *
      */
-    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
-    MessageStream<KV<Object, Object>> input1 = graphSpec.getInputStream(input1Descriptor);
-    OutputStream<KV<Object, Object>> output1 = graphSpec.getOutputStream(output1Descriptor);
-    input1
-        .partitionBy(m -> m.key, m -> m.value, "p1")
-        .map(kv -> kv)
-        .sendTo(output1);
-    return graphSpec;
+    return new StreamApplicationDescriptorImpl(appDesc-> {
+        MessageStream<KV<Object, Object>> input1 = appDesc.getInputStream(input1Descriptor);
+        OutputStream<KV<Object, Object>> output1 = appDesc.getOutputStream(output1Descriptor);
+        input1
+            .partitionBy(m -> m.key, m -> m.value, "p1")
+            .map(kv -> kv)
+            .sendTo(output1);
+      }, config);
   }
 
-  private StreamGraphSpec createStreamGraphWithJoin() {
+  private StreamApplicationDescriptorImpl createStreamGraphWithJoin() {
 
     /**
      * the graph looks like the following. number of partitions in parentheses. quotes indicate expected value.
@@ -136,80 +136,77 @@ public class TestExecutionPlanner {
      * input3 (32) -> filter -> partitionBy ("64") -> map -> join -> output2 (16)
      *
      */
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
-    MessageStream<KV<Object, Object>> messageStream1 =
-        graphSpec.getInputStream(input1Descriptor)
-            .map(m -> m);
-    MessageStream<KV<Object, Object>> messageStream2 =
-        graphSpec.getInputStream(input2Descriptor)
-            .partitionBy(m -> m.key, m -> m.value, "p1")
-            .filter(m -> true);
-    MessageStream<KV<Object, Object>> messageStream3 =
-        graphSpec.getInputStream(input3Descriptor)
-            .filter(m -> true)
-            .partitionBy(m -> m.key, m -> m.value, "p2")
-            .map(m -> m);
-    OutputStream<KV<Object, Object>> output1 = graphSpec.getOutputStream(output1Descriptor);
-    OutputStream<KV<Object, Object>> output2 = graphSpec.getOutputStream(output2Descriptor);
-
-    messageStream1
-        .join(messageStream2,
-            (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
-            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2), "j1")
-        .sendTo(output1);
-    messageStream3
-        .join(messageStream2,
-            (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
-            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j2")
-        .sendTo(output2);
-
-    return graphSpec;
+    return new StreamApplicationDescriptorImpl(appDesc -> {
+        MessageStream<KV<Object, Object>> messageStream1 =
+            appDesc.getInputStream(input1Descriptor)
+                .map(m -> m);
+        MessageStream<KV<Object, Object>> messageStream2 =
+            appDesc.getInputStream(input2Descriptor)
+                .partitionBy(m -> m.key, m -> m.value, "p1")
+                .filter(m -> true);
+        MessageStream<KV<Object, Object>> messageStream3 =
+            appDesc.getInputStream(input3Descriptor)
+                .filter(m -> true)
+                .partitionBy(m -> m.key, m -> m.value, "p2")
+                .map(m -> m);
+        OutputStream<KV<Object, Object>> output1 = appDesc.getOutputStream(output1Descriptor);
+        OutputStream<KV<Object, Object>> output2 = appDesc.getOutputStream(output2Descriptor);
+
+        messageStream1
+            .join(messageStream2,
+                (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
+                mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2), "j1")
+            .sendTo(output1);
+        messageStream3
+            .join(messageStream2,
+                (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
+                mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j2")
+            .sendTo(output2);
+      }, config);
   }
 
-  private StreamGraphSpec createStreamGraphWithJoinAndWindow() {
-
-    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
-    MessageStream<KV<Object, Object>> messageStream1 =
-        graphSpec.getInputStream(input1Descriptor)
-            .map(m -> m);
-    MessageStream<KV<Object, Object>> messageStream2 =
-        graphSpec.getInputStream(input2Descriptor)
-            .partitionBy(m -> m.key, m -> m.value, "p1")
-            .filter(m -> true);
-    MessageStream<KV<Object, Object>> messageStream3 =
-        graphSpec.getInputStream(input3Descriptor)
-            .filter(m -> true)
-            .partitionBy(m -> m.key, m -> m.value, "p2")
-            .map(m -> m);
-    OutputStream<KV<Object, Object>> output1 = graphSpec.getOutputStream(output1Descriptor);
-    OutputStream<KV<Object, Object>> output2 = graphSpec.getOutputStream(output2Descriptor);
-
-    messageStream1.map(m -> m)
-        .filter(m->true)
-        .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8), mock(Serde.class), mock(Serde.class)), "w1");
-
-    messageStream2.map(m -> m)
-        .filter(m->true)
-        .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16), mock(Serde.class), mock(Serde.class)), "w2");
-
-    messageStream1
-        .join(messageStream2,
-            (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
-            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(1600), "j1")
-        .sendTo(output1);
-    messageStream3
-        .join(messageStream2,
-            (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
-            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(100), "j2")
-        .sendTo(output2);
-    messageStream3
-        .join(messageStream2,
-            (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
-            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(252), "j3")
-        .sendTo(output2);
-
-    return graphSpec;
+  private StreamApplicationDescriptorImpl createStreamGraphWithJoinAndWindow() {
+
+    return new StreamApplicationDescriptorImpl(appDesc -> {
+        MessageStream<KV<Object, Object>> messageStream1 =
+            appDesc.getInputStream(input1Descriptor)
+                .map(m -> m);
+        MessageStream<KV<Object, Object>> messageStream2 =
+            appDesc.getInputStream(input2Descriptor)
+                .partitionBy(m -> m.key, m -> m.value, "p1")
+                .filter(m -> true);
+        MessageStream<KV<Object, Object>> messageStream3 =
+            appDesc.getInputStream(input3Descriptor)
+                .filter(m -> true)
+                .partitionBy(m -> m.key, m -> m.value, "p2")
+                .map(m -> m);
+        OutputStream<KV<Object, Object>> output1 = appDesc.getOutputStream(output1Descriptor);
+        OutputStream<KV<Object, Object>> output2 = appDesc.getOutputStream(output2Descriptor);
+
+        messageStream1.map(m -> m)
+            .filter(m->true)
+            .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8), mock(Serde.class), mock(Serde.class)), "w1");
+
+        messageStream2.map(m -> m)
+            .filter(m->true)
+            .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16), mock(Serde.class), mock(Serde.class)), "w2");
+
+        messageStream1
+            .join(messageStream2,
+                (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
+                mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(1600), "j1")
+            .sendTo(output1);
+        messageStream3
+            .join(messageStream2,
+                (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
+                mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(100), "j2")
+            .sendTo(output2);
+        messageStream3
+            .join(messageStream2,
+                (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
+                mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(252), "j3")
+            .sendTo(output2);
+      }, config);
   }
 
   @Before
@@ -265,7 +262,7 @@ public class TestExecutionPlanner {
   @Test
   public void testCreateProcessorGraph() {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
-    StreamGraphSpec graphSpec = createStreamGraphWithJoin();
+    StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin();
 
     JobGraph jobGraph = planner.createJobGraph(graphSpec.getOperatorSpecGraph());
     assertTrue(jobGraph.getSources().size() == 3);
@@ -276,7 +273,7 @@ public class TestExecutionPlanner {
   @Test
   public void testFetchExistingStreamPartitions() {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
-    StreamGraphSpec graphSpec = createStreamGraphWithJoin();
+    StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin();
     JobGraph jobGraph = planner.createJobGraph(graphSpec.getOperatorSpecGraph());
 
     ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager);
@@ -294,7 +291,7 @@ public class TestExecutionPlanner {
   @Test
   public void testCalculateJoinInputPartitions() {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
-    StreamGraphSpec graphSpec = createStreamGraphWithJoin();
+    StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin();
     JobGraph jobGraph = planner.createJobGraph(graphSpec.getOperatorSpecGraph());
 
     ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager);
@@ -313,7 +310,7 @@ public class TestExecutionPlanner {
     Config cfg = new MapConfig(map);
 
     ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
-    StreamGraphSpec graphSpec = createSimpleGraph();
+    StreamApplicationDescriptorImpl graphSpec = createSimpleGraph();
     JobGraph jobGraph = planner.createJobGraph(graphSpec.getOperatorSpecGraph());
     planner.calculatePartitions(jobGraph);
 
@@ -330,7 +327,7 @@ public class TestExecutionPlanner {
     Config cfg = new MapConfig(map);
 
     ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
-    StreamGraphSpec graphSpec = createStreamGraphWithJoin();
+    StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin();
     ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());
     List<JobConfig> jobConfigs = plan.getJobConfigs();
     for (JobConfig config : jobConfigs) {
@@ -345,7 +342,7 @@ public class TestExecutionPlanner {
     Config cfg = new MapConfig(map);
 
     ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
-    StreamGraphSpec graphSpec = createStreamGraphWithJoinAndWindow();
+    StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoinAndWindow();
     ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());
     List<JobConfig> jobConfigs = plan.getJobConfigs();
     assertEquals(1, jobConfigs.size());
@@ -362,7 +359,7 @@ public class TestExecutionPlanner {
     Config cfg = new MapConfig(map);
 
     ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
-    StreamGraphSpec graphSpec = createStreamGraphWithJoinAndWindow();
+    StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoinAndWindow();
     ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());
     List<JobConfig> jobConfigs = plan.getJobConfigs();
     assertEquals(1, jobConfigs.size());
@@ -379,7 +376,7 @@ public class TestExecutionPlanner {
     Config cfg = new MapConfig(map);
 
     ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
-    StreamGraphSpec graphSpec = createSimpleGraph();
+    StreamApplicationDescriptorImpl graphSpec = createSimpleGraph();
     ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());
     List<JobConfig> jobConfigs = plan.getJobConfigs();
     assertEquals(1, jobConfigs.size());
@@ -394,7 +391,7 @@ public class TestExecutionPlanner {
     Config cfg = new MapConfig(map);
 
     ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
-    StreamGraphSpec graphSpec = createSimpleGraph();
+    StreamApplicationDescriptorImpl graphSpec = createSimpleGraph();
     ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());
     List<JobConfig> jobConfigs = plan.getJobConfigs();
     assertEquals(1, jobConfigs.size());
@@ -404,7 +401,7 @@ public class TestExecutionPlanner {
   @Test
   public void testCalculateIntStreamPartitions() throws Exception {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
-    StreamGraphSpec graphSpec = createSimpleGraph();
+    StreamApplicationDescriptorImpl graphSpec = createSimpleGraph();
     JobGraph jobGraph = (JobGraph) planner.plan(graphSpec.getOperatorSpecGraph());
 
     // the partitions should be the same as input1
@@ -437,11 +434,12 @@ public class TestExecutionPlanner {
     int partitionLimit = ExecutionPlanner.MAX_INFERRED_PARTITIONS;
 
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
-    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
+    StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
+        MessageStream<KV<Object, Object>> input1 = appDesc.getInputStream(input4Descriptor);
+        OutputStream<KV<Object, Object>> output1 = appDesc.getOutputStream(output1Descriptor);
+        input1.partitionBy(m -> m.key, m -> m.value, "p1").map(kv -> kv).sendTo(output1);
+      }, config);
 
-    MessageStream<KV<Object, Object>> input1 = graphSpec.getInputStream(input4Descriptor);
-    OutputStream<KV<Object, Object>> output1 = graphSpec.getOutputStream(output1Descriptor);
-    input1.partitionBy(m -> m.key, m -> m.value, "p1").map(kv -> kv).sendTo(output1);
     JobGraph jobGraph = (JobGraph) planner.plan(graphSpec.getOperatorSpecGraph());
 
     // the partitions should be the same as input1

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
index 960693f..ae6e25e 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
@@ -22,13 +22,13 @@ package org.apache.samza.execution;
 import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.operators.descriptors.GenericInputDescriptor;
 import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
 import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
@@ -102,43 +102,44 @@ public class TestJobGraphJsonGenerator {
     when(systemAdmins.getSystemAdmin("system2")).thenReturn(systemAdmin2);
     StreamManager streamManager = new StreamManager(systemAdmins);
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
-    KVSerde<Object, Object> kvSerde = new KVSerde<>(new NoOpSerde(), new NoOpSerde());
-    String mockSystemFactoryClass = "factory.class.name";
-    GenericSystemDescriptor system1 = new GenericSystemDescriptor("system1", mockSystemFactoryClass);
-    GenericSystemDescriptor system2 = new GenericSystemDescriptor("system2", mockSystemFactoryClass);
-    GenericInputDescriptor<KV<Object, Object>> input1Descriptor = system1.getInputDescriptor("input1", kvSerde);
-    GenericInputDescriptor<KV<Object, Object>> input2Descriptor = system2.getInputDescriptor("input2", kvSerde);
-    GenericInputDescriptor<KV<Object, Object>> input3Descriptor = system2.getInputDescriptor("input3", kvSerde);
-    GenericOutputDescriptor<KV<Object, Object>>  output1Descriptor = system1.getOutputDescriptor("output1", kvSerde);
-    GenericOutputDescriptor<KV<Object, Object>> output2Descriptor = system2.getOutputDescriptor("output2", kvSerde);
-
-    MessageStream<KV<Object, Object>> messageStream1 =
-        graphSpec.getInputStream(input1Descriptor)
-            .map(m -> m);
-    MessageStream<KV<Object, Object>> messageStream2 =
-        graphSpec.getInputStream(input2Descriptor)
-            .partitionBy(m -> m.key, m -> m.value, "p1")
-            .filter(m -> true);
-    MessageStream<KV<Object, Object>> messageStream3 =
-        graphSpec.getInputStream(input3Descriptor)
-            .filter(m -> true)
-            .partitionBy(m -> m.key, m -> m.value, "p2")
-            .map(m -> m);
-    OutputStream<KV<Object, Object>> outputStream1 = graphSpec.getOutputStream(output1Descriptor);
-    OutputStream<KV<Object, Object>> outputStream2 = graphSpec.getOutputStream(output2Descriptor);
-
-    messageStream1
-        .join(messageStream2,
-            (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
-            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2), "j1")
-        .sendTo(outputStream1);
-    messageStream2.sink((message, collector, coordinator) -> { });
-    messageStream3
-        .join(messageStream2,
-            (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
-            mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j2")
-        .sendTo(outputStream2);
+    StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
+        KVSerde<Object, Object> kvSerde = new KVSerde<>(new NoOpSerde(), new NoOpSerde());
+        String mockSystemFactoryClass = "factory.class.name";
+        GenericSystemDescriptor system1 = new GenericSystemDescriptor("system1", mockSystemFactoryClass);
+        GenericSystemDescriptor system2 = new GenericSystemDescriptor("system2", mockSystemFactoryClass);
+        GenericInputDescriptor<KV<Object, Object>> input1Descriptor = system1.getInputDescriptor("input1", kvSerde);
+        GenericInputDescriptor<KV<Object, Object>> input2Descriptor = system2.getInputDescriptor("input2", kvSerde);
+        GenericInputDescriptor<KV<Object, Object>> input3Descriptor = system2.getInputDescriptor("input3", kvSerde);
+        GenericOutputDescriptor<KV<Object, Object>>  output1Descriptor = system1.getOutputDescriptor("output1", kvSerde);
+        GenericOutputDescriptor<KV<Object, Object>> output2Descriptor = system2.getOutputDescriptor("output2", kvSerde);
+
+        MessageStream<KV<Object, Object>> messageStream1 =
+            appDesc.getInputStream(input1Descriptor)
+                .map(m -> m);
+        MessageStream<KV<Object, Object>> messageStream2 =
+            appDesc.getInputStream(input2Descriptor)
+                .partitionBy(m -> m.key, m -> m.value, "p1")
+                .filter(m -> true);
+        MessageStream<KV<Object, Object>> messageStream3 =
+            appDesc.getInputStream(input3Descriptor)
+                .filter(m -> true)
+                .partitionBy(m -> m.key, m -> m.value, "p2")
+                .map(m -> m);
+        OutputStream<KV<Object, Object>> outputStream1 = appDesc.getOutputStream(output1Descriptor);
+        OutputStream<KV<Object, Object>> outputStream2 = appDesc.getOutputStream(output2Descriptor);
+
+        messageStream1
+            .join(messageStream2,
+                (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
+                mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2), "j1")
+            .sendTo(outputStream1);
+        messageStream2.sink((message, collector, coordinator) -> { });
+        messageStream3
+            .join(messageStream2,
+                (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
+                mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j2")
+            .sendTo(outputStream2);
+      }, config);
 
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());
@@ -177,27 +178,28 @@ public class TestJobGraphJsonGenerator {
     when(systemAdmins.getSystemAdmin("kafka")).thenReturn(systemAdmin2);
     StreamManager streamManager = new StreamManager(systemAdmins);
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(config);
-    KVSerde<String, PageViewEvent> pvSerde = KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class));
-    GenericSystemDescriptor isd = new GenericSystemDescriptor("hdfs", "mockSystemFactoryClass");
-    GenericInputDescriptor<KV<String, PageViewEvent>> pageView = isd.getInputDescriptor("PageView", pvSerde);
-
-    KVSerde<String, Long> pvcSerde = KVSerde.of(new StringSerde(), new LongSerde());
-    GenericSystemDescriptor osd = new GenericSystemDescriptor("kafka", "mockSystemFactoryClass");
-    GenericOutputDescriptor<KV<String, Long>> pageViewCount = osd.getOutputDescriptor("PageViewCount", pvcSerde);
-
-    MessageStream<KV<String, PageViewEvent>> inputStream = graphSpec.getInputStream(pageView);
-    OutputStream<KV<String, Long>> outputStream = graphSpec.getOutputStream(pageViewCount);
-    inputStream
-        .partitionBy(kv -> kv.getValue().getCountry(), kv -> kv.getValue(), "keyed-by-country")
-        .window(Windows.keyedTumblingWindow(kv -> kv.getValue().getCountry(),
-            Duration.ofSeconds(10L),
-            () -> 0L,
-            (m, c) -> c + 1L,
-            new StringSerde(),
-            new LongSerde()), "count-by-country")
-        .map(pane -> new KV<>(pane.getKey().getKey(), pane.getMessage()))
-        .sendTo(outputStream);
+    StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
+        KVSerde<String, PageViewEvent> pvSerde = KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class));
+        GenericSystemDescriptor isd = new GenericSystemDescriptor("hdfs", "mockSystemFactoryClass");
+        GenericInputDescriptor<KV<String, PageViewEvent>> pageView = isd.getInputDescriptor("PageView", pvSerde);
+
+        KVSerde<String, Long> pvcSerde = KVSerde.of(new StringSerde(), new LongSerde());
+        GenericSystemDescriptor osd = new GenericSystemDescriptor("kafka", "mockSystemFactoryClass");
+        GenericOutputDescriptor<KV<String, Long>> pageViewCount = osd.getOutputDescriptor("PageViewCount", pvcSerde);
+
+        MessageStream<KV<String, PageViewEvent>> inputStream = appDesc.getInputStream(pageView);
+        OutputStream<KV<String, Long>> outputStream = appDesc.getOutputStream(pageViewCount);
+        inputStream
+            .partitionBy(kv -> kv.getValue().getCountry(), kv -> kv.getValue(), "keyed-by-country")
+            .window(Windows.keyedTumblingWindow(kv -> kv.getValue().getCountry(),
+                Duration.ofSeconds(10L),
+                () -> 0L,
+                (m, c) -> c + 1L,
+                new StringSerde(),
+                new LongSerde()), "count-by-country")
+            .map(pane -> new KV<>(pane.getKey().getKey(), pane.getMessage()))
+            .sendTo(outputStream);
+      }, config);
 
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
     ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
index 864c3fc..163b094 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
@@ -24,6 +24,7 @@ import java.util.Base64;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.stream.Collectors;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
@@ -31,7 +32,6 @@ import org.apache.samza.config.SerializerConfig;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.operators.descriptors.GenericInputDescriptor;
 import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
 import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
@@ -49,8 +49,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 public class TestJobNode {
 
@@ -66,23 +65,24 @@ public class TestJobNode {
     when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName");
     when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
-    KVSerde<String, Object> serde = KVSerde.of(new StringSerde(), new JsonSerdeV2<>());
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("system1", "mockSystemFactoryClass");
-    GenericInputDescriptor<KV<String, Object>> inputDescriptor1 = sd.getInputDescriptor("input1", serde);
-    GenericInputDescriptor<KV<String, Object>> inputDescriptor2 = sd.getInputDescriptor("input2", serde);
-    GenericOutputDescriptor<KV<String, Object>> outputDescriptor = sd.getOutputDescriptor("output", serde);
-    MessageStream<KV<String, Object>> input1 = graphSpec.getInputStream(inputDescriptor1);
-    MessageStream<KV<String, Object>> input2 = graphSpec.getInputStream(inputDescriptor2);
-    OutputStream<KV<String, Object>> output = graphSpec.getOutputStream(outputDescriptor);
-    JoinFunction<String, Object, Object, KV<String, Object>> mockJoinFn = mock(JoinFunction.class);
-    input1
-        .partitionBy(KV::getKey, KV::getValue, serde, "p1")
-        .map(kv -> kv.value)
-        .join(input2.map(kv -> kv.value), mockJoinFn,
-            new StringSerde(), new JsonSerdeV2<>(Object.class), new JsonSerdeV2<>(Object.class),
-            Duration.ofHours(1), "j1")
-        .sendTo(output);
+    StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
+        KVSerde<String, Object> serde = KVSerde.of(new StringSerde(), new JsonSerdeV2<>());
+        GenericSystemDescriptor sd = new GenericSystemDescriptor("system1", "mockSystemFactoryClass");
+        GenericInputDescriptor<KV<String, Object>> inputDescriptor1 = sd.getInputDescriptor("input1", serde);
+        GenericInputDescriptor<KV<String, Object>> inputDescriptor2 = sd.getInputDescriptor("input2", serde);
+        GenericOutputDescriptor<KV<String, Object>> outputDescriptor = sd.getOutputDescriptor("output", serde);
+        MessageStream<KV<String, Object>> input1 = appDesc.getInputStream(inputDescriptor1);
+        MessageStream<KV<String, Object>> input2 = appDesc.getInputStream(inputDescriptor2);
+        OutputStream<KV<String, Object>> output = appDesc.getOutputStream(outputDescriptor);
+        JoinFunction<String, Object, Object, KV<String, Object>> mockJoinFn = mock(JoinFunction.class);
+        input1
+            .partitionBy(KV::getKey, KV::getValue, serde, "p1")
+            .map(kv -> kv.value)
+            .join(input2.map(kv -> kv.value), mockJoinFn,
+                new StringSerde(), new JsonSerdeV2<>(Object.class), new JsonSerdeV2<>(Object.class),
+                Duration.ofHours(1), "j1")
+            .sendTo(output);
+      }, mockConfig);
 
     JobNode jobNode = new JobNode("jobName", "jobId", graphSpec.getOperatorSpecGraph(), mockConfig);
     Config config = new MapConfig();
@@ -188,12 +188,13 @@ public class TestJobNode {
     when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName");
     when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
 
-    StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
-    GenericSystemDescriptor sd = new GenericSystemDescriptor("system1", "mockSystemFactoryClassName");
-    GenericInputDescriptor<KV<String, Object>> inputDescriptor1 =
-        sd.getInputDescriptor("input", KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
-    MessageStream<KV<String, Object>> input = graphSpec.getInputStream(inputDescriptor1);
-    input.partitionBy(KV::getKey, KV::getValue, "p1");
+    StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
+        GenericSystemDescriptor sd = new GenericSystemDescriptor("system1", "mockSystemFactoryClassName");
+        GenericInputDescriptor<KV<String, Object>> inputDescriptor1 =
+            sd.getInputDescriptor("input", KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
+        MessageStream<KV<String, Object>> input = appDesc.getInputStream(inputDescriptor1);
+        input.partitionBy(KV::getKey, KV::getValue, "p1");
+      }, mockConfig);
 
     JobNode jobNode = new JobNode("jobName", "jobId", graphSpec.getOperatorSpecGraph(), mockConfig);
     Config config = new MapConfig();

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/execution/TestLocalJobPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestLocalJobPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestLocalJobPlanner.java
new file mode 100644
index 0000000..9ed57fa
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestLocalJobPlanner.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.execution;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.samza.application.ApplicationDescriptor;
+import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.coordinator.CoordinationUtilsFactory;
+import org.apache.samza.coordinator.DistributedLockWithState;
+import org.apache.samza.system.StreamSpec;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Unit tests for {@link LocalJobPlanner}
+ *
+ * TODO: consolidate this with unit tests for ExecutionPlanner after SAMZA-1811
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({LocalJobPlanner.class, JobCoordinatorConfig.class})
+public class TestLocalJobPlanner {
+
+  private static final String PLAN_JSON =
+      "{" + "\"jobs\":[{" + "\"jobName\":\"test-application\"," + "\"jobId\":\"1\"," + "\"operatorGraph\":{"
+          + "\"intermediateStreams\":{%s}," + "\"applicationName\":\"test-application\",\"applicationId\":\"1\"}";
+  private static final String STREAM_SPEC_JSON_FORMAT =
+      "\"%s\":{" + "\"streamSpec\":{" + "\"id\":\"%s\"," + "\"systemName\":\"%s\"," + "\"physicalName\":\"%s\","
+          + "\"partitionCount\":2}," + "\"sourceJobs\":[\"test-app\"]," + "\"targetJobs\":[\"test-target-app\"]},";
+
+  private LocalJobPlanner localPlanner;
+
+  @Test
+  public void testStreamCreation()
+      throws Exception {
+    localPlanner = createLocalJobPlanner(mock(StreamApplicationDescriptorImpl.class));
+    StreamManager streamManager = mock(StreamManager.class);
+    doReturn(streamManager).when(localPlanner).buildAndStartStreamManager(any(Config.class));
+
+    ExecutionPlan plan = mock(ExecutionPlan.class);
+    when(plan.getIntermediateStreams()).thenReturn(
+        Collections.singletonList(new StreamSpec("test-stream", "test-stream", "test-system")));
+    when(plan.getPlanAsJson()).thenReturn("");
+    when(plan.getJobConfigs()).thenReturn(Collections.singletonList(mock(JobConfig.class)));
+    doReturn(plan).when(localPlanner).getExecutionPlan(any());
+
+    CoordinationUtilsFactory coordinationUtilsFactory = mock(CoordinationUtilsFactory.class);
+    JobCoordinatorConfig mockJcConfig = mock(JobCoordinatorConfig.class);
+    when(mockJcConfig.getCoordinationUtilsFactory()).thenReturn(coordinationUtilsFactory);
+    PowerMockito.whenNew(JobCoordinatorConfig.class).withAnyArguments().thenReturn(mockJcConfig);
+
+    localPlanner.prepareJobs();
+
+    ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
+    verify(streamManager).createStreams(captor.capture());
+    List<StreamSpec> streamSpecs = captor.getValue();
+    assertEquals(streamSpecs.size(), 1);
+    assertEquals(streamSpecs.get(0).getId(), "test-stream");
+    verify(streamManager).stop();
+  }
+
+  @Test
+  public void testStreamCreationWithCoordination()
+      throws Exception {
+    localPlanner = createLocalJobPlanner(mock(StreamApplicationDescriptorImpl.class));
+    StreamManager streamManager = mock(StreamManager.class);
+    doReturn(streamManager).when(localPlanner).buildAndStartStreamManager(any(Config.class));
+
+    ExecutionPlan plan = mock(ExecutionPlan.class);
+    when(plan.getIntermediateStreams()).thenReturn(Collections.singletonList(new StreamSpec("test-stream", "test-stream", "test-system")));
+    when(plan.getPlanAsJson()).thenReturn("");
+    when(plan.getJobConfigs()).thenReturn(Collections.singletonList(mock(JobConfig.class)));
+    doReturn(plan).when(localPlanner).getExecutionPlan(any());
+
+    CoordinationUtils coordinationUtils = mock(CoordinationUtils.class);
+    CoordinationUtilsFactory coordinationUtilsFactory = mock(CoordinationUtilsFactory.class);
+    JobCoordinatorConfig mockJcConfig = mock(JobCoordinatorConfig.class);
+    when(mockJcConfig.getCoordinationUtilsFactory()).thenReturn(coordinationUtilsFactory);
+    PowerMockito.whenNew(JobCoordinatorConfig.class).withAnyArguments().thenReturn(mockJcConfig);
+
+    DistributedLockWithState lock = mock(DistributedLockWithState.class);
+    when(lock.lockIfNotSet(anyLong(), anyObject())).thenReturn(true);
+    when(coordinationUtils.getLockWithState(anyString())).thenReturn(lock);
+    when(coordinationUtilsFactory.getCoordinationUtils(anyString(), anyString(), anyObject()))
+        .thenReturn(coordinationUtils);
+
+    localPlanner.prepareJobs();
+
+    ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
+    verify(streamManager).createStreams(captor.capture());
+
+    List<StreamSpec> streamSpecs = captor.getValue();
+    assertEquals(streamSpecs.size(), 1);
+    assertEquals(streamSpecs.get(0).getId(), "test-stream");
+    verify(streamManager).stop();
+  }
+
+  /**
+   * A test case to verify if the plan results in different hash if there is change in topological sort order.
+   * Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases.
+   */
+  @Test
+  public void testPlanIdWithShuffledStreamSpecs() {
+    List<StreamSpec> streamSpecs = ImmutableList.of(new StreamSpec("test-stream-1", "stream-1", "testStream"),
+        new StreamSpec("test-stream-2", "stream-2", "testStream"),
+        new StreamSpec("test-stream-3", "stream-3", "testStream"));
+    String planIdBeforeShuffle = getExecutionPlanId(streamSpecs);
+
+    List<StreamSpec> shuffledStreamSpecs = ImmutableList.of(new StreamSpec("test-stream-2", "stream-2", "testStream"),
+        new StreamSpec("test-stream-1", "stream-1", "testStream"),
+        new StreamSpec("test-stream-3", "stream-3", "testStream"));
+
+
+    assertFalse("Expected both of the latch ids to be different",
+        planIdBeforeShuffle.equals(getExecutionPlanId(shuffledStreamSpecs)));
+  }
+
+  /**
+   * A test case to verify if the plan results in same hash in case of same plan.
+   * Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases.
+   */
+  @Test
+  public void testGeneratePlanIdWithSameStreamSpecs() {
+    List<StreamSpec> streamSpecs = ImmutableList.of(new StreamSpec("test-stream-1", "stream-1", "testStream"),
+        new StreamSpec("test-stream-2", "stream-2", "testStream"),
+        new StreamSpec("test-stream-3", "stream-3", "testStream"));
+    String planIdForFirstAttempt = getExecutionPlanId(streamSpecs);
+    String planIdForSecondAttempt = getExecutionPlanId(streamSpecs);
+
+    assertEquals("Expected latch ids to match!", "1447946713", planIdForFirstAttempt);
+    assertEquals("Expected latch ids to match for the second attempt!", planIdForFirstAttempt, planIdForSecondAttempt);
+  }
+
+  /**
+   * A test case to verify plan results in different hash in case of different intermediate stream.
+   * Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases.
+   */
+  @Test
+  public void testGeneratePlanIdWithDifferentStreamSpecs() {
+    List<StreamSpec> streamSpecs = ImmutableList.of(new StreamSpec("test-stream-1", "stream-1", "testStream"),
+        new StreamSpec("test-stream-2", "stream-2", "testStream"),
+        new StreamSpec("test-stream-3", "stream-3", "testStream"));
+    String planIdBeforeShuffle = getExecutionPlanId(streamSpecs);
+
+    List<StreamSpec> updatedStreamSpecs = ImmutableList.of(new StreamSpec("test-stream-1", "stream-1", "testStream"),
+        new StreamSpec("test-stream-4", "stream-4", "testStream"),
+        new StreamSpec("test-stream-3", "stream-3", "testStream"));
+
+
+    assertFalse("Expected both of the latch ids to be different",
+        planIdBeforeShuffle.equals(getExecutionPlanId(updatedStreamSpecs)));
+  }
+
+  private LocalJobPlanner createLocalJobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) {
+    return spy(new LocalJobPlanner(appDesc));
+  }
+
+  private String getExecutionPlanId(List<StreamSpec> updatedStreamSpecs) {
+    String intermediateStreamJson =
+        updatedStreamSpecs.stream().map(this::streamSpecToJson).collect(Collectors.joining(","));
+
+    int planId = String.format(PLAN_JSON, intermediateStreamJson).hashCode();
+
+    return String.valueOf(planId);
+  }
+
+  private String streamSpecToJson(StreamSpec streamSpec) {
+    return String.format(STREAM_SPEC_JSON_FORMAT, streamSpec.getId(), streamSpec.getId(), streamSpec.getSystemName(),
+        streamSpec.getPhysicalName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/execution/TestRemoteJobPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestRemoteJobPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestRemoteJobPlanner.java
new file mode 100644
index 0000000..988fb34
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestRemoteJobPlanner.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.execution;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.samza.application.ApplicationDescriptor;
+import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.system.StreamSpec;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Unit tests for {@link RemoteJobPlanner}
+ *
+ * TODO: consolidate this with unit tests for ExecutionPlanner after SAMZA-1811
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(RemoteJobPlanner.class)
+public class TestRemoteJobPlanner {
+
+  private RemoteJobPlanner remotePlanner;
+
+  @Test
+  public void testStreamCreation()
+      throws Exception {
+    remotePlanner = createRemoteJobPlanner(mock(StreamApplicationDescriptorImpl.class));
+    StreamManager streamManager = mock(StreamManager.class);
+    doReturn(streamManager).when(remotePlanner).buildAndStartStreamManager(any(Config.class));
+
+    ExecutionPlan plan = mock(ExecutionPlan.class);
+    when(plan.getIntermediateStreams()).thenReturn(
+        Collections.singletonList(new StreamSpec("test-stream", "test-stream", "test-system")));
+    when(plan.getPlanAsJson()).thenReturn("");
+    when(plan.getJobConfigs()).thenReturn(Collections.singletonList(mock(JobConfig.class)));
+    ApplicationConfig mockAppConfig = mock(ApplicationConfig.class);
+    when(mockAppConfig.getAppMode()).thenReturn(ApplicationConfig.ApplicationMode.STREAM);
+    when(plan.getApplicationConfig()).thenReturn(mockAppConfig);
+    doReturn(plan).when(remotePlanner).getExecutionPlan(any(), any());
+
+    remotePlanner.prepareJobs();
+
+    verify(streamManager, times(0)).clearStreamsFromPreviousRun(any());
+    ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
+    verify(streamManager).createStreams(captor.capture());
+    List<StreamSpec> streamSpecs = captor.getValue();
+    assertEquals(streamSpecs.size(), 1);
+    assertEquals(streamSpecs.get(0).getId(), "test-stream");
+    verify(streamManager).stop();
+  }
+
+  private RemoteJobPlanner createRemoteJobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) {
+    return spy(new RemoteJobPlanner(appDesc));
+  }
+}