You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bo...@apache.org on 2018/09/26 00:23:04 UTC

[05/29] samza git commit: SAMZA-1840: Refactor TestRunner Apis to use StreamDescriptor and SystemDescriptor

SAMZA-1840: Refactor TestRunner Apis to use StreamDescriptor and SystemDescriptor

CollectionStream -> InMemoryInputDescriptor & InMemoryOutputDescriptor
CollectionStreamSystemSpec -> InMemorySystemDescriptor

Author: Sanil Jain <sn...@linkedin.com>

Reviewers: Prateek Maheshwari <pm...@apache.org>, Cameron Lee <ca...@linkedin.com>

Closes #634 from Sanil15/SAMZA-1840


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1755268c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1755268c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1755268c

Branch: refs/heads/NewKafkaSystemConsumer
Commit: 1755268cff201663a41ca06e9dcc4602d41fc306
Parents: 3bb24c8
Author: Sanil Jain <sn...@linkedin.com>
Authored: Wed Sep 19 12:21:12 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Wed Sep 19 12:21:12 2018 -0700

----------------------------------------------------------------------
 .../samza/example/PageViewCounterExample.java   |   1 -
 .../samza/test/framework/StreamAssert.java      |  73 ++---
 .../apache/samza/test/framework/TestRunner.java | 286 +++++++++----------
 .../test/framework/stream/CollectionStream.java | 204 -------------
 .../system/CollectionStreamSystemSpec.java      |  90 ------
 .../system/InMemoryInputDescriptor.java         |  42 +++
 .../system/InMemoryOutputDescriptor.java        |  46 +++
 .../system/InMemorySystemDescriptor.java        | 118 ++++++++
 .../AsyncStreamTaskIntegrationTest.java         | 108 ++++---
 .../StreamApplicationIntegrationTest.java       |  45 ++-
 .../framework/StreamTaskIntegrationTest.java    | 112 +++++---
 .../table/TestLocalTableWithSideInputs.java     |  32 ++-
 12 files changed, 573 insertions(+), 584 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java b/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
index b540585..e2ebc93 100644
--- a/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
@@ -53,7 +53,6 @@ public class PageViewCounterExample implements StreamApplication {
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
     PageViewCounterExample app = new PageViewCounterExample();
     ApplicationRunner runner = ApplicationRunners.getApplicationRunner(app, config);
-
     runner.run();
     runner.waitForFinish();
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java b/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
index 9972d7f..42379f3 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
@@ -22,9 +22,9 @@ package org.apache.samza.test.framework;
 import com.google.common.base.Preconditions;
 import java.time.Duration;
 import java.util.stream.Collectors;
-import org.apache.samza.test.framework.stream.CollectionStream;
 import java.util.List;
 import java.util.Map;
+import org.apache.samza.test.framework.system.InMemoryOutputDescriptor;
 import org.hamcrest.collection.IsIterableContainingInAnyOrder;
 import org.hamcrest.collection.IsIterableContainingInOrder;
 
@@ -32,22 +32,24 @@ import static org.junit.Assert.assertThat;
 
 
 /**
- * Assertion utils non the content of a {@link CollectionStream}.
+ * Assertion utils on the content of a stream described by
+ * {@link org.apache.samza.operators.descriptors.base.stream.StreamDescriptor}.
  */
 public class StreamAssert {
   /**
-   * Util to assert  presence of messages in a stream with single partition in any order
+   * Verifies that the {@code expected} messages are present in any order in the single partition stream
+   * represented by {@code outputDescriptor}
    *
-   * @param collectionStream represents the actual stream which will be consumed to compare against expected list
-   * @param expected represents the expected stream of messages
+   * @param expected expected stream of messages
+   * @param outputDescriptor describes the stream which will be consumed to compare against expected list
    * @param timeout maximum time to wait for consuming the stream
-   * @param <M> represents the type of Message in the stream
+   * @param <StreamMessageType> type of messages in the stream
    * @throws InterruptedException when {@code consumeStream} is interrupted by another thread during polling messages
    */
-  public static <M> void containsInAnyOrder(CollectionStream<M> collectionStream, final List<M> expected, Duration timeout)
-      throws InterruptedException {
-    Preconditions.checkNotNull(collectionStream, "This util is intended to use only on CollectionStream");
-    assertThat(TestRunner.consumeStream(collectionStream, timeout)
+  public static <StreamMessageType> void containsInAnyOrder(List<StreamMessageType> expected,
+      InMemoryOutputDescriptor<StreamMessageType> outputDescriptor, Duration timeout) throws InterruptedException {
+    Preconditions.checkNotNull(outputDescriptor);
+    assertThat(TestRunner.consumeStream(outputDescriptor, timeout)
         .entrySet()
         .stream()
         .flatMap(entry -> entry.getValue().stream())
@@ -55,19 +57,20 @@ public class StreamAssert {
   }
 
   /**
-   * Util to assert presence of messages in a stream with multiple partition in any order
+   * Verifies that the {@code expected} messages are present in any order in the multi partition stream
+   * represented by {@code outputDescriptor}
    *
-   * @param collectionStream represents the actual stream which will be consumed to compare against expected partition map
-   * @param expected represents a map of partitionId as key and list of messages in stream as value
+   * @param expected map of partitionId as key and list of messages in stream as value
+   * @param outputDescriptor describes the stream which will be consumed to compare against expected partition map
    * @param timeout maximum time to wait for consuming the stream
-   * @param <M> represents the type of Message in the stream
+   * @param <StreamMessageType>  type of messages in the stream
    * @throws InterruptedException when {@code consumeStream} is interrupted by another thread during polling messages
    *
    */
-  public static <M> void containsInAnyOrder(CollectionStream<M> collectionStream, final Map<Integer, List<M>> expected,
-      Duration timeout) throws InterruptedException {
-    Preconditions.checkNotNull(collectionStream, "This util is intended to use only on CollectionStream");
-    Map<Integer, List<M>> actual = TestRunner.consumeStream(collectionStream, timeout);
+  public static <StreamMessageType> void containsInAnyOrder(Map<Integer, List<StreamMessageType>> expected,
+      InMemoryOutputDescriptor<StreamMessageType> outputDescriptor, Duration timeout) throws InterruptedException {
+    Preconditions.checkNotNull(outputDescriptor);
+    Map<Integer, List<StreamMessageType>> actual = TestRunner.consumeStream(outputDescriptor, timeout);
     for (Integer paritionId : expected.keySet()) {
       assertThat(actual.get(paritionId),
           IsIterableContainingInAnyOrder.containsInAnyOrder(expected.get(paritionId).toArray()));
@@ -75,18 +78,19 @@ public class StreamAssert {
   }
 
   /**
-   * Util to assert ordering of messages in a stream with single partition
+   * Verifies that the {@code expected} messages are present in order in the single partition stream
+   * represented by {@code outputDescriptor}
    *
-   * @param collectionStream represents the actual stream which will be consumed to compare against expected list
-   * @param expected represents the expected stream of messages
+   * @param expected  expected stream of messages
+   * @param outputDescriptor describes the stream which will be consumed to compare against expected list
    * @param timeout maximum time to wait for consuming the stream
-   * @param <M> represents the type of Message in the stream
+   * @param <StreamMessageType> type of messages in the stream
    * @throws InterruptedException when {@code consumeStream} is interrupted by another thread during polling messages
    */
-  public static <M> void containsInOrder(CollectionStream<M> collectionStream, final List<M> expected, Duration timeout)
-      throws InterruptedException {
-    Preconditions.checkNotNull(collectionStream, "This util is intended to use only on CollectionStream");
-    assertThat(TestRunner.consumeStream(collectionStream, timeout)
+  public static <StreamMessageType> void containsInOrder(List<StreamMessageType> expected,
+      InMemoryOutputDescriptor<StreamMessageType> outputDescriptor, Duration timeout) throws InterruptedException {
+    Preconditions.checkNotNull(outputDescriptor);
+    assertThat(TestRunner.consumeStream(outputDescriptor, timeout)
         .entrySet()
         .stream()
         .flatMap(entry -> entry.getValue().stream())
@@ -94,18 +98,19 @@ public class StreamAssert {
   }
 
   /**
-   * Util to assert ordering of messages in a multi-partitioned stream
+   * Verifies that the {@code expected} messages are present in order in the multi partition stream
+   * represented by {@code outputDescriptor}
    *
-   * @param collectionStream represents the actual stream which will be consumed to compare against expected partition map
-   * @param expected represents a map of partitionId as key and list of messages as value
+   * @param expected map of partitionId as key and list of messages as value
+   * @param outputDescriptor describes the stream which will be consumed to compare against expected partition map
    * @param timeout maximum time to wait for consuming the stream
-   * @param <M> represents the type of Message in the stream
+   * @param <StreamMessageType> type of messages in the stream
    * @throws InterruptedException when {@code consumeStream} is interrupted by another thread during polling messages
    */
-  public static <M> void containsInOrder(CollectionStream<M> collectionStream, final Map<Integer, List<M>> expected,
-      Duration timeout) throws InterruptedException {
-    Preconditions.checkNotNull(collectionStream, "This util is intended to use only on CollectionStream");
-    Map<Integer, List<M>> actual = TestRunner.consumeStream(collectionStream, timeout);
+  public static <StreamMessageType> void containsInOrder(Map<Integer, List<StreamMessageType>> expected,
+      InMemoryOutputDescriptor<StreamMessageType> outputDescriptor, Duration timeout) throws InterruptedException {
+    Preconditions.checkNotNull(outputDescriptor);
+    Map<Integer, List<StreamMessageType>> actual = TestRunner.consumeStream(outputDescriptor, timeout);
     for (Integer paritionId : expected.keySet()) {
       assertThat(actual.get(paritionId), IsIterableContainingInOrder.contains(expected.get(paritionId).toArray()));
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index 033bcdf..5c4ba3b 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -35,7 +35,6 @@ import org.apache.samza.application.SamzaApplication;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.application.TaskApplication;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.InMemorySystemConfig;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
@@ -62,58 +61,47 @@ import org.apache.samza.task.AsyncStreamTaskFactory;
 import org.apache.samza.task.StreamTask;
 import org.apache.samza.task.StreamTaskFactory;
 import org.apache.samza.task.TaskFactory;
-import org.apache.samza.test.framework.stream.CollectionStream;
-import org.apache.samza.test.framework.system.CollectionStreamSystemSpec;
+import org.apache.samza.test.framework.system.InMemoryInputDescriptor;
+import org.apache.samza.test.framework.system.InMemoryOutputDescriptor;
+import org.apache.samza.test.framework.system.InMemorySystemDescriptor;
 import org.junit.Assert;
 
 
 /**
- * TestRunner provides apis to quickly set up tests for Samza low level and high level apis. Default running mode
- * for test is Single container without any distributed coordination service. Test runner maintains global job config
- * {@code configs} that are used to run the Samza job
+ * TestRunner provides APIs to set up integration tests for a Samza application.
+ * Running mode for test is Single container mode
+ * Test sets following configuration for the application
  *
- * For single container mode following configs are set by default
+ * The following configs are set by default
  *  <ol>
  *    <li>"job.coordination.utils.factory" = {@link PassthroughCoordinationUtilsFactory}</li>
  *    <li>"job.coordination.factory" = {@link PassthroughJobCoordinatorFactory}</li>
  *    <li>"task.name.grouper.factory" = {@link SingleContainerGrouperFactory}</li>
  *    <li>"job.name" = "test-samza"</li>
  *    <li>"processor.id" = "1"</li>
- *    <li>"inmemory.scope = " Scope id generated to isolate the run for InMemorySystem</li>
  *  </ol>
  *
  */
 public class TestRunner {
-
-  private static final String JOB_NAME = "test-samza";
-  public enum Mode {
-    SINGLE_CONTAINER, MULTI_CONTAINER
-  }
+  public static final String JOB_NAME = "samza-test";
 
   private Map<String, String> configs;
-  private Map<String, CollectionStreamSystemSpec> systems;
   private Class taskClass;
   private StreamApplication app;
-  private String testId;
-  private SystemFactory factory;
-
-  /**
-   * Mode defines single or multi container running configuration, by default a single container configuration is assumed
+  /*
+   * inMemoryScope is a unique global key per TestRunner, this key when configured with {@link InMemorySystemDescriptor}
+   * provides an isolated state to run with in memory system
    */
-  private Mode mode;
+  private String inMemoryScope;
 
   private TestRunner() {
-    this.testId = RandomStringUtils.random(10, true, true);
-    this.systems = new HashMap<String, CollectionStreamSystemSpec>();
     this.configs = new HashMap<>();
-    this.mode = Mode.SINGLE_CONTAINER;
-    this.factory = new InMemorySystemFactory();
-    configs.put(InMemorySystemConfig.INMEMORY_SCOPE, testId);
+    this.inMemoryScope = RandomStringUtils.random(10, true, true);
     configs.put(JobConfig.JOB_NAME(), JOB_NAME);
-    configs.putIfAbsent(JobConfig.PROCESSOR_ID(), "1");
-    configs.putIfAbsent(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
-    configs.putIfAbsent(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
-    configs.putIfAbsent(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
+    configs.put(JobConfig.PROCESSOR_ID(), "1");
+    configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
+    configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
+    configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
   }
 
   /**
@@ -129,7 +117,7 @@ public class TestRunner {
 
   /**
    * Constructs a new {@link TestRunner} from following components
-   * @param app represent a class containing Samza job logic implementing {@link StreamApplication}
+   * @param app samza job implementing {@link StreamApplication}
    */
   private TestRunner(StreamApplication app) {
     this();
@@ -138,20 +126,9 @@ public class TestRunner {
   }
 
   /**
-   * Registers a system with TestRunner if not already registered and configures all the system configs to global
-   * job configs
-   */
-  private void registerSystem(String systemName) {
-    if (!systems.containsKey(systemName)) {
-      systems.put(systemName, CollectionStreamSystemSpec.create(systemName, JOB_NAME));
-      configs.putAll(systems.get(systemName).getSystemConfigs());
-    }
-  }
-
-  /**
    * Creates an instance of {@link TestRunner} for Low Level Samza Api
-   * @param taskClass represent a class extending either {@link StreamTask} or {@link AsyncStreamTask}
-   * @return a {@link TestRunner} for {@code taskClass}
+   * @param taskClass samza job extending either {@link StreamTask} or {@link AsyncStreamTask}
+   * @return this {@link TestRunner}
    */
   public static TestRunner of(Class taskClass) {
     Preconditions.checkNotNull(taskClass);
@@ -162,8 +139,8 @@ public class TestRunner {
 
   /**
    * Creates an instance of {@link TestRunner} for High Level/Fluent Samza Api
-   * @param app represent a class representing Samza job by implementing {@link StreamApplication}
-   * @return a {@link TestRunner} for {@code app}
+   * @param app samza job implementing {@link StreamApplication}
+   * @return this {@link TestRunner}
    */
   public static TestRunner of(StreamApplication app) {
     Preconditions.checkNotNull(app);
@@ -171,11 +148,11 @@ public class TestRunner {
   }
 
   /**
-   * Only adds a config from {@code config} to global {@code configs} if they dont exist in it.
-   * @param config represents the {@link Config} supposed to be added to global configs
-   * @return calling instance of {@link TestRunner} with added configs if they don't exist
+   * Only adds a config from {@code config} to samza job {@code configs} if they dont exist in it.
+   * @param config configs for the application
+   * @return this {@link TestRunner}
    */
-  public TestRunner addConfigs(Config config) {
+  public TestRunner addConfigs(Map<String, String> config) {
     Preconditions.checkNotNull(config);
     config.forEach(this.configs::putIfAbsent);
     return this;
@@ -186,7 +163,7 @@ public class TestRunner {
    * exisiting in {@code configs}
    * @param key key of the config
    * @param value value of the config
-   * @return calling instance of {@link TestRunner} with added config
+   * @return this {@link TestRunner}
    */
   public TestRunner addOverrideConfig(String key, String value) {
     Preconditions.checkNotNull(key);
@@ -197,94 +174,72 @@ public class TestRunner {
   }
 
   /**
-   * Configures {@code stream} with the TestRunner, adds all the stream specific configs to global job configs.
-   * <p>
-   * Every stream belongs to a System (here a {@link CollectionStreamSystemSpec}), this utility also registers the system with
-   * {@link TestRunner} if not registered already. Then it creates and initializes the stream partitions with messages for
-   * the registered System
-   * <p>
-   * @param stream represents the stream that is supposed to be configured with {@link TestRunner}
-   * @return calling instance of {@link TestRunner} with {@code stream} configured with it
+   * Adds the provided input stream with mock data to the test application.
+   *
+   * @param descriptor describes the stream that is supposed to be input to Samza application
+   * @param messages messages used to initialize the single partition stream
+   * @param <StreamMessageType> a message with null key or a KV {@link org.apache.samza.operators.KV}.
+   *                            key of KV represents key of {@link org.apache.samza.system.IncomingMessageEnvelope} or
+   *                           {@link org.apache.samza.system.OutgoingMessageEnvelope} and value is message
+   * @return this {@link TestRunner}
    */
-  public TestRunner addInputStream(CollectionStream stream) {
-    Preconditions.checkNotNull(stream);
-    registerSystem(stream.getSystemName());
-    initializeInput(stream);
-    stream.setTestId(testId);
-    if (configs.containsKey(TaskConfig.INPUT_STREAMS())) {
-      configs.put(TaskConfig.INPUT_STREAMS(),
-          configs.get(TaskConfig.INPUT_STREAMS()).concat("," + stream.getSystemName() + "." + stream.getPhysicalName()));
-    } else {
-      configs.put(TaskConfig.INPUT_STREAMS(), stream.getSystemName() + "." + stream.getPhysicalName());
-    }
-    stream.getStreamConfig().forEach((key, val) -> {
-        configs.putIfAbsent((String) key, (String) val);
-      });
-
+  public <StreamMessageType> TestRunner addInputStream(InMemoryInputDescriptor descriptor,
+      List<StreamMessageType> messages) {
+    Preconditions.checkNotNull(descriptor, messages);
+    Map<Integer, Iterable<StreamMessageType>> partitionData = new HashMap<Integer, Iterable<StreamMessageType>>();
+    partitionData.put(0, messages);
+    initializeInMemoryInputStream(descriptor, partitionData);
     return this;
   }
 
   /**
-   * Creates an in memory stream with {@link InMemorySystemFactory} and initializes the metadata for the stream.
-   * Initializes each partition of that stream with messages from {@code stream.getInitPartitions}
-   *
-   * @param stream represents the stream to initialize with the in memory system
-   * @param <T> can represent a message or a KV {@link org.apache.samza.operators.KV}, key of which represents key of a
-   *            {@link org.apache.samza.system.IncomingMessageEnvelope} or {@link org.apache.samza.system.OutgoingMessageEnvelope}
-   *            and value represents the message
+   * Adds the provided input stream with mock data to the test application.
+   * @param descriptor describes the stream that is supposed to be input to Samza application
+   * @param messages map whose key is partitionId and value is messages in the partition
+   * @param <StreamMessageType> message with null key or a KV {@link org.apache.samza.operators.KV}.
+   *                           A key of which represents key of {@link org.apache.samza.system.IncomingMessageEnvelope} or
+   *                           {@link org.apache.samza.system.OutgoingMessageEnvelope} and value is message
+   * @return this {@link TestRunner}
    */
-  private <T> void initializeInput(CollectionStream stream) {
-    Preconditions.checkNotNull(stream);
-    Preconditions.checkState(stream.getInitPartitions().size() >= 1);
-    String streamName = stream.getStreamName();
-    String systemName = stream.getSystemName();
-    Map<Integer, Iterable<T>> partitions = stream.getInitPartitions();
-    StreamSpec spec = new StreamSpec(streamName, stream.getPhysicalName(), systemName, partitions.size());
-    factory.getAdmin(systemName, new MapConfig(configs)).createStream(spec);
-    SystemProducer producer = factory.getProducer(systemName, new MapConfig(configs), null);
-    partitions.forEach((partitionId, partition) -> {
-        partition.forEach(e -> {
-            Object key = e instanceof KV ? ((KV) e).getKey() : null;
-            Object value = e instanceof KV ? ((KV) e).getValue() : e;
-            producer.send(systemName,
-                new OutgoingMessageEnvelope(new SystemStream(systemName, stream.getPhysicalName()), Integer.valueOf(partitionId), key,
-                    value));
-          });
-        producer.send(systemName,
-            new OutgoingMessageEnvelope(new SystemStream(systemName, stream.getPhysicalName()), Integer.valueOf(partitionId), null,
-                new EndOfStreamMessage(null)));
-      });
+  public <StreamMessageType> TestRunner addInputStream(InMemoryInputDescriptor descriptor,
+      Map<Integer, ? extends Iterable<StreamMessageType>> messages) {
+    Preconditions.checkNotNull(descriptor, messages);
+    Map<Integer, Iterable<StreamMessageType>> partitionData = new HashMap<Integer, Iterable<StreamMessageType>>();
+    partitionData.putAll(messages);
+    initializeInMemoryInputStream(descriptor, partitionData);
+    return this;
   }
 
   /**
-   * Configures {@code stream} with the TestRunner, adds all the stream specific configs to global job configs.
-   * <p>
-   * Every stream belongs to a System (here a {@link CollectionStreamSystemSpec}), this utility also registers the system with
-   * {@link TestRunner} if not registered already. Then it creates the stream partitions with the registered System
-   * <p>
-   * @param stream represents the stream that is supposed to be configured with {@link TestRunner}
-   * @return calling instance of {@link TestRunner} with {@code stream} configured with it
+   * Adds the provided output stream to the test application.
+   * @param streamDescriptor describes the stream that is supposed to be output for the Samza application
+   * @param partitionCount partition count of output stream
+   * @return this {@link TestRunner}
    */
-  public TestRunner addOutputStream(CollectionStream stream) {
-    Preconditions.checkNotNull(stream);
-    Preconditions.checkState(stream.getInitPartitions().size() >= 1);
-    registerSystem(stream.getSystemName());
-    stream.setTestId(testId);
-    StreamSpec spec = new StreamSpec(stream.getStreamName(), stream.getPhysicalName(), stream.getSystemName(), stream.getInitPartitions().size());
+  public TestRunner addOutputStream(InMemoryOutputDescriptor streamDescriptor, int partitionCount) {
+    Preconditions.checkNotNull(streamDescriptor);
+    Preconditions.checkState(partitionCount >= 1);
+    InMemorySystemDescriptor imsd = (InMemorySystemDescriptor) streamDescriptor.getSystemDescriptor();
+    imsd.withInMemoryScope(this.inMemoryScope);
+    Config config = new MapConfig(streamDescriptor.toConfig(), streamDescriptor.getSystemDescriptor().toConfig());
+    InMemorySystemFactory factory = new InMemorySystemFactory();
+    String physicalName = (String) streamDescriptor.getPhysicalName().orElse(streamDescriptor.getStreamId());
+    StreamSpec spec = new StreamSpec(streamDescriptor.getStreamId(), physicalName, streamDescriptor.getSystemName(),
+        partitionCount);
     factory
-        .getAdmin(stream.getSystemName(), new MapConfig(configs))
+        .getAdmin(streamDescriptor.getSystemName(), config)
         .createStream(spec);
-    configs.putAll(stream.getStreamConfig());
+    addConfigs(streamDescriptor.toConfig());
+    addConfigs(streamDescriptor.getSystemDescriptor().toConfig());
     return this;
   }
 
-
   /**
-   * Utility to run a test configured using TestRunner
+   * Run the application with the specified timeout
    *
-   * @param timeout time to wait for the high level application or low level task to finish. This timeout does not include
+   * @param timeout time to wait for the application to finish. This timeout does not include
    *                input stream initialization time or the assertion time over output streams. This timeout just accounts
-   *                for time that samza job takes run. Samza job won't be invoked with negative or zero timeout
+   *                for time that samza job takes run. Timeout must be greater than 0.
    * @throws SamzaException if Samza job fails with exception and returns UnsuccessfulFinish as the statuscode
    */
   public void run(Duration timeout) {
@@ -301,34 +256,33 @@ public class TestRunner {
       throw new SamzaException(ExceptionUtils.getStackTrace(status.getThrowable()));
     }
   }
+
   /**
-   * Utility to read the messages from a stream from the beginning, this is supposed to be used after executing the
-   * TestRunner in order to assert over the streams (ex output streams).
+   * Gets the contents of the output stream represented by {@code outputDescriptor} after {@link TestRunner#run(Duration)}
+   * has completed
    *
-   * @param stream represents {@link CollectionStream} whose current state of partitions is requested to be fetched
-   * @param timeout poll timeout in Ms
-   * @param <T> represents type of message
+   * @param outputDescriptor describes the stream to be consumed
+   * @param timeout timeout for consumption of stream in Ms
+   * @param <StreamMessageType> type of message
    *
-   * @return a map key of which represents the {@code partitionId} and value represents the current state of the partition
-   *         i.e messages in the partition
-   * @throws InterruptedException Thrown when a blocking poll has been interrupted by another thread.
+   * @return a map whose key is {@code partitionId} and value is messages in partition
+   * @throws SamzaException Thrown when a poll is incomplete
    */
-  public static <T> Map<Integer, List<T>> consumeStream(CollectionStream stream, Duration timeout) throws InterruptedException {
-    Preconditions.checkNotNull(stream);
-    Preconditions.checkNotNull(stream.getSystemName());
-    String streamName = stream.getStreamName();
-    String systemName = stream.getSystemName();
+  public static <StreamMessageType> Map<Integer, List<StreamMessageType>> consumeStream(
+      InMemoryOutputDescriptor outputDescriptor, Duration timeout) throws SamzaException {
+    Preconditions.checkNotNull(outputDescriptor);
+    String streamId = outputDescriptor.getStreamId();
+    String systemName = outputDescriptor.getSystemName();
     Set<SystemStreamPartition> ssps = new HashSet<>();
-    Set<String> streamNames = new HashSet<>();
-    streamNames.add(streamName);
+    Set<String> streamIds = new HashSet<>();
+    streamIds.add(streamId);
     SystemFactory factory = new InMemorySystemFactory();
-    HashMap<String, String> config = new HashMap<>();
-    config.put(InMemorySystemConfig.INMEMORY_SCOPE, stream.getTestId());
-    Map<String, SystemStreamMetadata> metadata =
-        factory.getAdmin(systemName, new MapConfig(config)).getSystemStreamMetadata(streamNames);
-    SystemConsumer consumer = factory.getConsumer(systemName, new MapConfig(config), null);
-    metadata.get(stream.getPhysicalName()).getSystemStreamPartitionMetadata().keySet().forEach(partition -> {
-        SystemStreamPartition temp = new SystemStreamPartition(systemName, streamName, partition);
+    Config config = new MapConfig(outputDescriptor.toConfig(), outputDescriptor.getSystemDescriptor().toConfig());
+    Map<String, SystemStreamMetadata> metadata = factory.getAdmin(systemName, config).getSystemStreamMetadata(streamIds);
+    SystemConsumer consumer = factory.getConsumer(systemName, config, null);
+    String name = (String) outputDescriptor.getPhysicalName().orElse(streamId);
+    metadata.get(name).getSystemStreamPartitionMetadata().keySet().forEach(partition -> {
+        SystemStreamPartition temp = new SystemStreamPartition(systemName, streamId, partition);
         ssps.add(temp);
         consumer.register(temp, "0");
       });
@@ -337,12 +291,17 @@ public class TestRunner {
     Map<SystemStreamPartition, List<IncomingMessageEnvelope>> output = new HashMap<>();
     HashSet<SystemStreamPartition> didNotReachEndOfStream = new HashSet<>(ssps);
     while (System.currentTimeMillis() < t + timeout.toMillis()) {
-      Map<SystemStreamPartition, List<IncomingMessageEnvelope>> currentState = consumer.poll(ssps, 10);
+      Map<SystemStreamPartition, List<IncomingMessageEnvelope>> currentState = null;
+      try {
+        currentState = consumer.poll(ssps, 10);
+      } catch (InterruptedException e) {
+        throw new SamzaException("Timed out while consuming stream \n" + e.getMessage());
+      }
       for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> entry : currentState.entrySet()) {
         SystemStreamPartition ssp = entry.getKey();
         output.computeIfAbsent(ssp, k -> new LinkedList<IncomingMessageEnvelope>());
         List<IncomingMessageEnvelope> currentBuffer = entry.getValue();
-        Integer totalMessagesToFetch = Integer.valueOf(metadata.get(stream.getStreamName())
+        Integer totalMessagesToFetch = Integer.valueOf(metadata.get(outputDescriptor.getStreamId())
             .getSystemStreamPartitionMetadata()
             .get(ssp.getPartition())
             .getNewestOffset());
@@ -364,7 +323,7 @@ public class TestRunner {
     return output.entrySet()
         .stream()
         .collect(Collectors.toMap(entry -> entry.getKey().getPartition().getPartitionId(),
-            entry -> entry.getValue().stream().map(e -> (T) e.getMessage()).collect(Collectors.toList())));
+            entry -> entry.getValue().stream().map(e -> (StreamMessageType) e.getMessage()).collect(Collectors.toList())));
   }
 
   private TaskFactory createTaskFactory() {
@@ -388,4 +347,41 @@ public class TestRunner {
     throw new SamzaException(String.format("Not supported task.class %s. task.class has to implement either StreamTask "
         + "or AsyncStreamTask", taskClass.getName()));
   }
+
+  /**
+   * Creates an in memory stream with {@link InMemorySystemFactory} and feeds its partition with stream of messages
+   * @param partitonData key of the map represents partitionId and value represents
+   *                 messages in the partition
+   * @param descriptor describes a stream to initialize with the in memory system
+   */
+  private <StreamMessageType> void initializeInMemoryInputStream(InMemoryInputDescriptor descriptor,
+      Map<Integer, Iterable<StreamMessageType>> partitonData) {
+    String systemName = descriptor.getSystemName();
+    String streamName = (String) descriptor.getPhysicalName().orElse(descriptor.getStreamId());
+    if (configs.containsKey(TaskConfig.INPUT_STREAMS())) {
+      configs.put(TaskConfig.INPUT_STREAMS(),
+          configs.get(TaskConfig.INPUT_STREAMS()).concat("," + systemName + "." + streamName));
+    } else {
+      configs.put(TaskConfig.INPUT_STREAMS(), systemName + "." + streamName);
+    }
+    InMemorySystemDescriptor imsd = (InMemorySystemDescriptor) descriptor.getSystemDescriptor();
+    imsd.withInMemoryScope(this.inMemoryScope);
+    addConfigs(descriptor.toConfig());
+    addConfigs(descriptor.getSystemDescriptor().toConfig());
+    StreamSpec spec = new StreamSpec(descriptor.getStreamId(), streamName, systemName, partitonData.size());
+    SystemFactory factory = new InMemorySystemFactory();
+    Config config = new MapConfig(descriptor.toConfig(), descriptor.getSystemDescriptor().toConfig());
+    factory.getAdmin(systemName, config).createStream(spec);
+    SystemProducer producer = factory.getProducer(systemName, config, null);
+    SystemStream sysStream = new SystemStream(systemName, streamName);
+    partitonData.forEach((partitionId, partition) -> {
+        partition.forEach(e -> {
+            Object key = e instanceof KV ? ((KV) e).getKey() : null;
+            Object value = e instanceof KV ? ((KV) e).getValue() : e;
+            producer.send(systemName, new OutgoingMessageEnvelope(sysStream, Integer.valueOf(partitionId), key, value));
+          });
+        producer.send(systemName, new OutgoingMessageEnvelope(sysStream, Integer.valueOf(partitionId), null,
+          new EndOfStreamMessage(null)));
+      });
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/main/java/org/apache/samza/test/framework/stream/CollectionStream.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/stream/CollectionStream.java b/samza-test/src/main/java/org/apache/samza/test/framework/stream/CollectionStream.java
deleted file mode 100644
index 320a0ac..0000000
--- a/samza-test/src/main/java/org/apache/samza/test/framework/stream/CollectionStream.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.framework.stream;
-
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * A CollectionStream represents an in memory stream of messages that can either have single or multiple partitions.
- * Every CollectionStream is coupled with a {@link org.apache.samza.test.framework.system.CollectionStreamSystemSpec} that
- * contains all the specification for system
- *<p>
- * When sending messages using {@code CollectionStream<KV<K, V>>}, messages use K as key and V as message
- * When sending messages using {@code CollectionStream<T>}, messages use a nullkey.
- *</p>
- * @param <T>
- *        can represent a message with null key or a KV {@link org.apache.samza.operators.KV}, key of which represents key of a
- *        {@link org.apache.samza.system.IncomingMessageEnvelope} or {@link org.apache.samza.system.OutgoingMessageEnvelope}
- *        and value represents the message of the same
- */
-public class CollectionStream<T> {
-  private String testId;
-  private final String streamName;
-  private final String physicalName;
-  private final String systemName;
-  private Map<Integer, Iterable<T>> initPartitions;
-  private Map<String, String> streamConfig;
-  private static final String STREAM_TO_SYSTEM = "streams.%s.samza.system";
-  private static final String PHYSICAL_NAME = "streams.%s.samza.physical.name";
-
-  /**
-   * Constructs a new CollectionStream from specified components.
-   * @param systemName represents name of the system stream is associated with
-   * @param streamName represents name of the stream
-   */
-  private CollectionStream(String systemName, String streamName) {
-    Preconditions.checkNotNull(systemName);
-    Preconditions.checkNotNull(streamName);
-    this.systemName = systemName;
-    this.streamName = streamName;
-    this.streamConfig = new HashMap<>();
-    // TODO: Once SAMZA-1737 is resolved, generate a randomized physical name
-    this.physicalName = streamName;
-    streamConfig.put(String.format(STREAM_TO_SYSTEM, this.streamName), systemName);
-    streamConfig.put(String.format(PHYSICAL_NAME, this.streamName), physicalName);
-  }
-
-
-  /**
-   * Constructs a new CollectionStream with multiple empty partitions from specified components.
-   * @param systemName represents name of the system stream is associated with
-   * @param streamName represents name of the stream
-   * @param partitionCount represents number of partitions, each of these partitions will be empty
-   */
-  private CollectionStream(String systemName, String streamName, Integer partitionCount) {
-    this(systemName, streamName);
-    Preconditions.checkState(partitionCount > 0);
-    initPartitions = new HashMap<>();
-    for (int i = 0; i < partitionCount; i++) {
-      initPartitions.put(i, new ArrayList<>());
-    }
-  }
-
-  /**
-   * Constructs a new CollectionStream with single partition from specified components.
-   * @param systemName represents name of the system stream is associated with
-   * @param streamName represents name of the stream
-   * @param initPartition represents the messages that the stream will be intialized with, default partitionId for the
-   *                  this single partition stream is 0
-   */
-  private CollectionStream(String systemName, String streamName, Iterable<T> initPartition) {
-    this(systemName, streamName);
-    Preconditions.checkNotNull(initPartition);
-    initPartitions = new HashMap<>();
-    initPartitions.put(0, initPartition);
-  }
-
-  /**
-   * Constructs a new CollectionStream with multiple partitions from specified components.
-   * @param systemName represents name of the system stream is associated with
-   * @param streamName represents name of the stream
-   * @param initPartitions represents the partition state, key of the map represents partitionId and value represents
-   *                   the messages that partition will be initialized with
-   */
-  private CollectionStream(String systemName, String streamName, Map<Integer, ? extends Iterable<T>> initPartitions) {
-    this(systemName, streamName);
-    Preconditions.checkNotNull(initPartitions);
-    this.initPartitions = new HashMap<>(initPartitions);
-  }
-
-  /**
-   * @return The Map of partitions that input stream is supposed to be initialized with, this method is
-   * used internally and should not be used for asserting over streams.
-   * The true state of stream is determined by {@code consmeStream()} of {@link org.apache.samza.test.framework.TestRunner}
-   */
-  public Map<Integer, Iterable<T>> getInitPartitions() {
-    return initPartitions;
-  }
-
-  public String getStreamName() {
-    return streamName;
-  }
-
-  public String getSystemName() {
-    return systemName;
-  }
-
-  public Map<String, String> getStreamConfig() {
-    return streamConfig;
-  }
-
-  public String getTestId() {
-    return testId;
-  }
-
-  public void setTestId(String testId) {
-    this.testId = testId;
-  }
-
-  public String getPhysicalName() {
-    return physicalName;
-  }
-
-  /**
-   * Creates an in memory stream with the name {@code streamName} and initializes the stream to only one partition
-   *
-   * @param systemName represents name of the system stream is associated with
-   * @param streamName represents the name of the Stream
-   * @param <T> represents the type of each message in a stream
-   * @return an {@link CollectionStream} with only one partition that can contain messages of the type
-   */
-  public static <T> CollectionStream<T> empty(String systemName, String streamName) {
-    return new CollectionStream<>(systemName, streamName, 1);
-  }
-
-  /**
-   * Creates an in memory stream with the name {@code streamName} and initializes the stream to have as many partitions
-   * as specified by {@code partitionCount}. These partitions are empty and are supposed to be used by Samza job to produce
-   * messages to.
-   *
-   * @param systemName represents name of the system stream is associated with
-   * @param streamName represents the name of the Stream
-   * @param partitionCount represents the number of partitions the stream would have
-   * @param <T> represents the type of each message in a stream
-   * @return an empty {@link CollectionStream} with multiple partitions that can contain messages of the type {@code T}
-   */
-  public static <T> CollectionStream<T> empty(String systemName, String streamName, int partitionCount) {
-    return new CollectionStream<>(systemName, streamName, partitionCount);
-  }
-
-  /**
-   * Creates an in memory stream with the name {@code streamName}. Stream is created with single partition having
-   * {@code partitionId} is 0. This partition is intialzied with messages of type T
-   *
-   * @param systemName represents name of the system stream is associated with
-   * @param streamName represents the name of the Stream
-   * @param partition represents the messages that the {@link org.apache.samza.system.SystemStreamPartition} will be
-   *                  initialized with
-   * @param <T> represents the type of a message in the stream
-   * @return a {@link CollectionStream} with only one partition containing messages of the type {@code T}
-   *
-   */
-  public static <T> CollectionStream<T> of(String systemName, String streamName, Iterable<T> partition) {
-    return new CollectionStream<>(systemName, streamName, partition);
-  }
-
-  /**
-   * Creates an in memory stream with the name {@code streamName} and initializes the stream to have as many partitions
-   * as the size of {@code partitions} map. Key of the map {@code partitions} represents the {@code partitionId} of
-   * each {@link org.apache.samza.Partition} for a {@link org.apache.samza.system.SystemStreamPartition} and value is
-   * an Iterable of messages that the {@link org.apache.samza.system.SystemStreamPartition} should be initialized with.
-   *
-   * @param systemName represents name of the system stream is associated with
-   * @param streamName represents the name of the Stream
-   * @param partitions Key of an entry in partitions represents a {@code partitionId} of a {@link org.apache.samza.Partition}
-   *                   and value represents the stream of messages the {@link org.apache.samza.system.SystemStreamPartition}
-   *                   will be initialized with
-   * @param <T> represents the type of a message in the stream
-   * @return a {@link CollectionStream} with multiple partitions each containing messages of the type {@code T}
-   *
-   */
-  public static <T> CollectionStream<T> of(String systemName, String streamName, Map<Integer, ? extends Iterable<T>> partitions) {
-    return new CollectionStream<>(systemName, streamName, partitions);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/main/java/org/apache/samza/test/framework/system/CollectionStreamSystemSpec.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/CollectionStreamSystemSpec.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/CollectionStreamSystemSpec.java
deleted file mode 100644
index 5658f61..0000000
--- a/samza-test/src/main/java/org/apache/samza/test/framework/system/CollectionStreamSystemSpec.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.framework.system;
-
-import com.google.common.base.Preconditions;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.samza.system.inmemory.InMemorySystemFactory;
-
-
-/**
- * CollectionStreamSystem represents a system that interacts with an underlying {@link InMemorySystemFactory} to create
- * various input and output streams and initialize {@link org.apache.samza.system.SystemStreamPartition} with messages
- * <p>
- * Following system level configs are set by default
- * <ol>
- *   <li>"systems.%s.default.stream.samza.offset.default" = "oldest"</li>
- *   <li>"jobs.job-name.systems.%s.default.stream.samza.offset.default" = "oldest"</li>
- *   <li>"systems.%s.samza.factory" = {@link InMemorySystemFactory}</li>
- *   <li>"jobs.job-name.systems.%s.samza.factory" = {@link InMemorySystemFactory}</li>
- * </ol>
- * The "systems.*" configs are required since the planner uses the system to get metadata about streams during
- * planning. The "jobs.job-name.systems.*" configs are required since configs generated from user provided
- * system/stream descriptors override configs originally supplied to the planner. Configs in the "jobs.job-name.*"
- * scope have the highest precedence.
- */
-public class CollectionStreamSystemSpec {
-  private static final String CONFIG_OVERRIDE_PREFIX = "jobs.%s."; // prefix to override configs generated by the planner
-  private static final String SYSTEM_FACTORY = "systems.%s.samza.factory";
-  private static final String SYSTEM_OFFSET = "systems.%s.default.stream.samza.offset.default";
-
-  private String systemName;
-  private Map<String, String> systemConfigs;
-
-  /**
-   * Constructs a new CollectionStreamSystem from specified components.
-   * <p>
-   * Every {@link CollectionStreamSystemSpec} is assumed to consume from the oldest offset, since stream is in memory and
-   * is used for testing purpose. System uses {@link InMemorySystemFactory} to initialize in memory streams.
-   * <p>
-   * @param systemName represents unique name of the system
-   */
-  private CollectionStreamSystemSpec(String systemName, String jobName) {
-    this.systemName = systemName;
-    systemConfigs = new HashMap<String, String>();
-    systemConfigs.put(String.format(SYSTEM_FACTORY, systemName), InMemorySystemFactory.class.getName());
-    systemConfigs.put(String.format(CONFIG_OVERRIDE_PREFIX + SYSTEM_FACTORY, jobName, systemName), InMemorySystemFactory.class.getName());
-    systemConfigs.put(String.format(SYSTEM_OFFSET, systemName), "oldest");
-    systemConfigs.put(String.format(CONFIG_OVERRIDE_PREFIX + SYSTEM_OFFSET, jobName, systemName), "oldest");
-  }
-
-  public String getSystemName() {
-    return systemName;
-  }
-
-  public Map<String, String> getSystemConfigs() {
-    return systemConfigs;
-  }
-
-  /**
-   * Creates a {@link CollectionStreamSystemSpec} with name {@code systemName}
-   * @param systemName represents name of the {@link CollectionStreamSystemSpec}
-   * @param jobName name of the job
-   * @return an instance of {@link CollectionStreamSystemSpec}
-   */
-  public static CollectionStreamSystemSpec create(String systemName, String jobName) {
-    Preconditions.checkState(StringUtils.isNotBlank(systemName));
-    Preconditions.checkState(StringUtils.isNotBlank(jobName));
-    return new CollectionStreamSystemSpec(systemName, jobName);
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryInputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryInputDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryInputDescriptor.java
new file mode 100644
index 0000000..6065bf0
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryInputDescriptor.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.framework.system;
+
+import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
+import org.apache.samza.serializers.NoOpSerde;
+
+/**
+ * A descriptor for an in memory stream of messages that can either have single or multiple partitions.
+ * <p>
+ *  An instance of this descriptor may be obtained from an appropriately configured {@link InMemorySystemDescriptor}.
+ * <p>
+ * @param <StreamMessageType> type of messages in input stream
+ */
+public class InMemoryInputDescriptor<StreamMessageType>
+    extends InputDescriptor<StreamMessageType, InMemoryInputDescriptor<StreamMessageType>> {
+  /**
+   * Constructs a new InMemoryInputDescriptor from specified components.
+   * @param systemDescriptor name of the system stream is associated with
+   * @param streamId name of the stream
+   */
+  InMemoryInputDescriptor(String streamId, InMemorySystemDescriptor systemDescriptor) {
+    super(streamId, new NoOpSerde<>(), systemDescriptor, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryOutputDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryOutputDescriptor.java
new file mode 100644
index 0000000..75fe7ae
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryOutputDescriptor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.framework.system;
+
+import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.serializers.NoOpSerde;
+
+/**
+ * A descriptor for an in memory output stream.
+ * <p>
+ * An instance of this descriptor may be obtained from an appropriately configured {@link InMemorySystemDescriptor}.
+ * <p>
+ * Stream properties configured using a descriptor override corresponding properties provided in configuration.
+ *
+ * @param <StreamMessageType> type of messages in this stream.
+ */
+public class InMemoryOutputDescriptor<StreamMessageType>
+    extends OutputDescriptor<StreamMessageType, InMemoryOutputDescriptor<StreamMessageType>> {
+
+  /**
+   * Constructs an {@link OutputDescriptor} instance.
+   * @param streamId id of the stream
+   * @param systemDescriptor system descriptor this stream descriptor was obtained from
+   */
+  InMemoryOutputDescriptor(String streamId, SystemDescriptor systemDescriptor) {
+    super(streamId, new NoOpSerde<>(), systemDescriptor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java
new file mode 100644
index 0000000..92b23ef
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.framework.system;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.InMemorySystemConfig;
+import org.apache.samza.operators.descriptors.base.system.OutputDescriptorProvider;
+import org.apache.samza.operators.descriptors.base.system.SimpleInputDescriptorProvider;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.inmemory.InMemorySystemFactory;
+import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.test.framework.TestRunner;
+
+
+/**
+ * A descriptor for InMemorySystem.
+ * System properties configured using a descriptor override corresponding properties provided in configuration.
+ * <p>
+ * Following system level configs are set by default
+ * <ol>
+ *   <li>"systems.%s.default.stream.samza.offset.default" = "oldest"</li>
+ *   <li>"systems.%s.samza.factory" = {@link InMemorySystemFactory}</li>
+ *   <li>"inmemory.scope = "Scope id generated to isolate the system in memory</li>
+ * </ol>
+ */
+public class InMemorySystemDescriptor extends SystemDescriptor<InMemorySystemDescriptor>
+    implements SimpleInputDescriptorProvider, OutputDescriptorProvider {
+  private static final String FACTORY_CLASS_NAME = InMemorySystemFactory.class.getName();
+  /**
+   * <p>
+   * The "systems.*" configs are required since the planner uses the system to get metadata about streams during
+   * planning. The "jobs.job-name.systems.*" configs are required since configs generated from user provided
+   * system/stream descriptors override configs originally supplied to the planner. Configs in the "jobs.job-name.*"
+   * scope have the highest precedence.
+   *
+   * For this case, it generates following overridden configs
+   * <ol>
+   *      <li>"jobs.<job-name>.systems.%s.default.stream.samza.offset.default" = "oldest"</li>
+   *      <li>"jobs.<job-name>.systems.%s.samza.factory" = {@link InMemorySystemFactory}</li>
+   * </ol>
+   *
+   **/
+  private static final String CONFIG_OVERRIDE_PREFIX = "jobs.%s.";
+  private static final String DEFAULT_STREAM_OFFSET_DEFAULT_CONFIG_KEY = "systems.%s.default.stream.samza.offset.default";
+
+  private String inMemoryScope;
+
+  /**
+   * Constructs a new InMemorySystemDescriptor from specified components.
+   * <p>
+   * Every {@link InMemorySystemDescriptor} is configured to consume from the oldest offset, since stream is in memory and
+   * is used for testing purpose. System uses {@link InMemorySystemFactory} to initialize in memory streams.
+   * <p>
+   * @param systemName unique name of the system
+   */
+  public InMemorySystemDescriptor(String systemName) {
+    super(systemName, FACTORY_CLASS_NAME, null, null);
+    this.withDefaultStreamOffsetDefault(SystemStreamMetadata.OffsetType.OLDEST);
+  }
+
+  @Override
+  public <StreamMessageType> InMemoryInputDescriptor<StreamMessageType> getInputDescriptor(
+      String streamId, Serde<StreamMessageType> serde) {
+    return new InMemoryInputDescriptor<StreamMessageType>(streamId, this);
+  }
+
+  @Override
+  public <StreamMessageType> InMemoryOutputDescriptor<StreamMessageType> getOutputDescriptor(
+      String streamId, Serde<StreamMessageType> serde) {
+    return new InMemoryOutputDescriptor<StreamMessageType>(streamId, this);
+  }
+
+  /**
+   * {@code inMemoryScope} defines the unique instance of InMemorySystem, that this system uses
+   * This method is framework use only, users are not supposed to use it
+   *
+   * @param inMemoryScope acts as a unique global identifier for this instance of InMemorySystem
+   * @return this system descriptor
+   */
+  public InMemorySystemDescriptor withInMemoryScope(String inMemoryScope) {
+    this.inMemoryScope = inMemoryScope;
+    return this;
+  }
+
+  @Override
+  public Map<String, String> toConfig() {
+    HashMap<String, String> configs = new HashMap<>(super.toConfig());
+    configs.put(InMemorySystemConfig.INMEMORY_SCOPE, this.inMemoryScope);
+    configs.put(String.format(CONFIG_OVERRIDE_PREFIX + JavaSystemConfig.SYSTEM_FACTORY_FORMAT, TestRunner.JOB_NAME, getSystemName()),
+        FACTORY_CLASS_NAME);
+    configs.put(
+        String.format(CONFIG_OVERRIDE_PREFIX + DEFAULT_STREAM_OFFSET_DEFAULT_CONFIG_KEY, TestRunner.JOB_NAME,
+            getSystemName()), SystemStreamMetadata.OffsetType.OLDEST.toString());
+    return configs;
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
index 3a1eba0..581b1c3 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
@@ -27,7 +27,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.samza.operators.KV;
-import org.apache.samza.test.framework.stream.CollectionStream;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.test.framework.system.InMemoryInputDescriptor;
+import org.apache.samza.test.framework.system.InMemoryOutputDescriptor;
+import org.apache.samza.test.framework.system.InMemorySystemDescriptor;
 import org.hamcrest.collection.IsIterableContainingInOrder;
 import org.junit.Assert;
 import org.junit.Test;
@@ -40,16 +43,21 @@ public class AsyncStreamTaskIntegrationTest {
     List<Integer> inputList = Arrays.asList(1, 2, 3, 4, 5);
     List<Integer> outputList = Arrays.asList(10, 20, 30, 40, 50);
 
-    CollectionStream<Integer> input = CollectionStream.of("async-test", "ints", inputList);
-    CollectionStream output = CollectionStream.empty("async-test", "ints-out");
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor("async-test");
+
+    InMemoryInputDescriptor<Integer> imid = isd
+        .getInputDescriptor("ints", new NoOpSerde<Integer>());
+
+    InMemoryOutputDescriptor imod = isd
+        .getOutputDescriptor("ints-out", new NoOpSerde<>());
 
     TestRunner
         .of(MyAsyncStreamTask.class)
-        .addInputStream(input)
-        .addOutputStream(output)
+        .addInputStream(imid, inputList)
+        .addOutputStream(imod, 1)
         .run(Duration.ofSeconds(2));
 
-    Assert.assertThat(TestRunner.consumeStream(output, Duration.ofMillis(1000)).get(0),
+    Assert.assertThat(TestRunner.consumeStream(imod, Duration.ofMillis(1000)).get(0),
         IsIterableContainingInOrder.contains(outputList.toArray()));
   }
 
@@ -58,49 +66,70 @@ public class AsyncStreamTaskIntegrationTest {
     List<Integer> inputList = Arrays.asList(1, 2, 3, 4, 5);
     List<Integer> outputList = Arrays.asList(50, 10, 20, 30, 40);
 
-    CollectionStream<Integer> input = CollectionStream.of("async-test", "ints", inputList);
-    CollectionStream output = CollectionStream.empty("async-test", "ints-out");
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor("async-test");
+
+    InMemoryInputDescriptor<Integer> imid = isd
+        .getInputDescriptor("ints", new NoOpSerde<Integer>());
+
+    InMemoryOutputDescriptor imod = isd
+        .getOutputDescriptor("ints-out", new NoOpSerde<>());
 
     TestRunner
         .of(MyAsyncStreamTask.class)
-        .addInputStream(input)
-        .addOutputStream(output)
+        .addInputStream(imid, inputList)
+        .addOutputStream(imod, 1)
         .run(Duration.ofSeconds(2));
 
-    StreamAssert.containsInAnyOrder(output, outputList, Duration.ofMillis(1000));
+    StreamAssert.containsInAnyOrder(outputList, imod, Duration.ofMillis(1000));
   }
 
   @Test
   public void testAsyncTaskWithMultiplePartition() throws Exception {
     Map<Integer, List<KV>> inputPartitionData = new HashMap<>();
     Map<Integer, List<Integer>> expectedOutputPartitionData = new HashMap<>();
-    List<Integer> partition = Arrays.asList(1, 2, 3, 4, 5);
-    List<Integer> outputPartition = partition.stream().map(x -> x * 10).collect(Collectors.toList());
-    for (int i = 0; i < 5; i++) {
-      List<KV> keyedPartition = new ArrayList<>();
-      for (Integer val : partition) {
-        keyedPartition.add(KV.of(i, val));
-      }
-      inputPartitionData.put(i, keyedPartition);
-      expectedOutputPartitionData.put(i, new ArrayList<Integer>(outputPartition));
-    }
+    genData(inputPartitionData, expectedOutputPartitionData);
 
-    CollectionStream<KV> inputStream = CollectionStream.of("async-test", "ints", inputPartitionData);
-    CollectionStream outputStream = CollectionStream.empty("async-test", "ints-out", 5);
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor("async-test");
+
+    InMemoryInputDescriptor<KV> imid = isd
+        .getInputDescriptor("ints", new NoOpSerde<KV>());
+    InMemoryOutputDescriptor imod = isd
+        .getOutputDescriptor("ints-out", new NoOpSerde<>());
 
     TestRunner
         .of(MyAsyncStreamTask.class)
-        .addInputStream(inputStream)
-        .addOutputStream(outputStream)
+        .addInputStream(imid, inputPartitionData)
+        .addOutputStream(imod, 5)
         .run(Duration.ofSeconds(2));
 
-    StreamAssert.containsInOrder(outputStream, expectedOutputPartitionData, Duration.ofMillis(1000));
+    StreamAssert.containsInOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000));
   }
 
   @Test
   public void testAsyncTaskWithMultiplePartitionMultithreaded() throws Exception {
     Map<Integer, List<KV>> inputPartitionData = new HashMap<>();
     Map<Integer, List<Integer>> expectedOutputPartitionData = new HashMap<>();
+    genData(inputPartitionData, expectedOutputPartitionData);
+
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor("async-test");
+
+    InMemoryInputDescriptor<KV> imid = isd
+        .getInputDescriptor("ints", new NoOpSerde<>());
+
+    InMemoryOutputDescriptor imod = isd
+        .getOutputDescriptor("ints-out", new NoOpSerde<>());
+
+    TestRunner
+        .of(MyAsyncStreamTask.class)
+        .addInputStream(imid, inputPartitionData)
+        .addOutputStream(imod, 5)
+        .addOverrideConfig("task.max.concurrency", "4")
+        .run(Duration.ofSeconds(2));
+
+    StreamAssert.containsInAnyOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000));
+  }
+
+  public void genData(Map<Integer, List<KV>> inputPartitionData, Map<Integer, List<Integer>> expectedOutputPartitionData) {
     List<Integer> partition = Arrays.asList(1, 2, 3, 4, 5);
     List<Integer> outputPartition = partition.stream().map(x -> x * 10).collect(Collectors.toList());
     for (int i = 0; i < 5; i++) {
@@ -111,18 +140,6 @@ public class AsyncStreamTaskIntegrationTest {
       inputPartitionData.put(i, keyedPartition);
       expectedOutputPartitionData.put(i, new ArrayList<Integer>(outputPartition));
     }
-
-    CollectionStream<KV> inputStream = CollectionStream.of("async-test", "ints", inputPartitionData);
-    CollectionStream outputStream = CollectionStream.empty("async-test", "ints-out", 5);
-
-    TestRunner
-        .of(MyAsyncStreamTask.class)
-        .addInputStream(inputStream)
-        .addOutputStream(outputStream)
-        .addOverrideConfig("task.max.concurrency", "4")
-        .run(Duration.ofSeconds(2));
-
-    StreamAssert.containsInAnyOrder(outputStream, expectedOutputPartitionData, Duration.ofMillis(1000));
   }
 
   /**
@@ -130,15 +147,20 @@ public class AsyncStreamTaskIntegrationTest {
    */
   @Test(expected = AssertionError.class)
   public void testSamzaJobTimeoutFailureForAsyncTask() {
-    List<Integer> inputList = Arrays.asList(1, 2, 3, 4);
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor("async-test");
+
+    InMemoryInputDescriptor<Integer> imid = isd
+        .getInputDescriptor("ints", new NoOpSerde<>());
 
-    CollectionStream<Integer> input = CollectionStream.of("async-test", "ints", inputList);
-    CollectionStream output = CollectionStream.empty("async-test", "ints-out");
+    InMemoryOutputDescriptor imod = isd
+        .getOutputDescriptor("ints-out", new NoOpSerde<>());
 
     TestRunner
         .of(MyAsyncStreamTask.class)
-        .addInputStream(input)
-        .addOutputStream(output)
+        .addInputStream(imid, Arrays.asList(1, 2, 3, 4))
+        .addOutputStream(imod, 1)
         .run(Duration.ofMillis(1));
   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
index 1000f22..6dd9159 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
@@ -36,7 +36,9 @@ import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.kafka.KafkaInputDescriptor;
 import org.apache.samza.system.kafka.KafkaSystemDescriptor;
 import org.apache.samza.test.controlmessages.TestData;
-import org.apache.samza.test.framework.stream.CollectionStream;
+import org.apache.samza.test.framework.system.InMemoryInputDescriptor;
+import org.apache.samza.test.framework.system.InMemoryOutputDescriptor;
+import org.apache.samza.test.framework.system.InMemorySystemDescriptor;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -82,17 +84,22 @@ public class StreamApplicationIntegrationTest {
       pageviews.add(pv);
     }
 
-    CollectionStream<PageView> input = CollectionStream.of("test", "PageView", pageviews);
-    CollectionStream output = CollectionStream.empty("test", "Output", 10);
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
+
+    InMemoryInputDescriptor<PageView> imid = isd
+        .getInputDescriptor("PageView", new NoOpSerde<PageView>());
+
+    InMemoryOutputDescriptor<PageView> imod = isd
+        .getOutputDescriptor("Output", new NoOpSerde<PageView>());
 
     TestRunner
         .of(pageViewRepartition)
-        .addInputStream(input)
-        .addOutputStream(output)
+        .addInputStream(imid, pageviews)
+        .addOutputStream(imod, 10)
         .addOverrideConfig("job.default.system", "test")
         .run(Duration.ofMillis(1500));
 
-    Assert.assertEquals(TestRunner.consumeStream(output, Duration.ofMillis(1000)).get(random.nextInt(count)).size(), 1);
+    Assert.assertEquals(TestRunner.consumeStream(imod, Duration.ofMillis(1000)).get(random.nextInt(count)).size(), 1);
   }
 
   public static final class Values {
@@ -107,13 +114,18 @@ public class StreamApplicationIntegrationTest {
   @Test(expected = SamzaException.class)
   public void testSamzaJobStartMissingConfigFailureForStreamApplication() {
 
-    CollectionStream<TestData.PageView> input = CollectionStream.of("test", "PageView", new ArrayList<>());
-    CollectionStream output = CollectionStream.empty("test", "Output", 10);
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
+
+    InMemoryInputDescriptor<PageView> imid = isd
+        .getInputDescriptor("PageView", new NoOpSerde<PageView>());
+
+    InMemoryOutputDescriptor<PageView> imod = isd
+        .getOutputDescriptor("Output", new NoOpSerde<PageView>());
 
     TestRunner
         .of(pageViewRepartition)
-        .addInputStream(input)
-        .addOutputStream(output)
+        .addInputStream(imid, new ArrayList<>())
+        .addOutputStream(imod, 10)
         .run(Duration.ofMillis(1000));
   }
 
@@ -131,12 +143,17 @@ public class StreamApplicationIntegrationTest {
       pageviews.add(new TestData.PageView(null, memberId));
     }
 
-    CollectionStream<TestData.PageView> input = CollectionStream.of("test", "PageView", pageviews);
-    CollectionStream output = CollectionStream.empty("test", "Output", 1);
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
+
+    InMemoryInputDescriptor<PageView> imid = isd
+        .getInputDescriptor("PageView", new NoOpSerde<PageView>());
+
+    InMemoryOutputDescriptor<PageView> imod = isd
+        .getOutputDescriptor("Output", new NoOpSerde<PageView>());
 
     TestRunner.of(pageViewFilter)
-        .addInputStream(input)
-        .addOutputStream(output)
+        .addInputStream(imid, pageviews)
+        .addOutputStream(imod, 10)
         .addOverrideConfig("job.default.system", "test")
         .run(Duration.ofMillis(1000));
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
index f888b4a..0580598 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
@@ -28,7 +28,10 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
 import org.apache.samza.operators.KV;
-import org.apache.samza.test.framework.stream.CollectionStream;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.test.framework.system.InMemoryInputDescriptor;
+import org.apache.samza.test.framework.system.InMemoryOutputDescriptor;
+import org.apache.samza.test.framework.system.InMemorySystemDescriptor;
 import org.hamcrest.collection.IsIterableContainingInOrder;
 import org.junit.Assert;
 import org.junit.Test;
@@ -41,13 +44,23 @@ public class StreamTaskIntegrationTest {
     List<Integer> inputList = Arrays.asList(1, 2, 3, 4, 5);
     List<Integer> outputList = Arrays.asList(10, 20, 30, 40, 50);
 
-    CollectionStream<Integer> input = CollectionStream.of("test", "input", inputList);
-    CollectionStream output = CollectionStream.empty("test", "output");
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
 
-    TestRunner.of(MyStreamTestTask.class).addInputStream(input).addOutputStream(output).run(Duration.ofSeconds(1));
+    InMemoryInputDescriptor<Integer> imid = isd
+        .getInputDescriptor("input", new NoOpSerde<Integer>());
 
-    Assert.assertThat(TestRunner.consumeStream(output, Duration.ofMillis(1000)).get(0),
+    InMemoryOutputDescriptor<Integer> imod = isd
+        .getOutputDescriptor("output", new NoOpSerde<Integer>());
+
+    TestRunner
+        .of(MyStreamTestTask.class)
+        .addInputStream(imid, inputList)
+        .addOutputStream(imod, 1)
+        .run(Duration.ofSeconds(1));
+
+    Assert.assertThat(TestRunner.consumeStream(imod, Duration.ofMillis(1000)).get(0),
         IsIterableContainingInOrder.contains(outputList.toArray()));
+
   }
 
   /**
@@ -57,10 +70,19 @@ public class StreamTaskIntegrationTest {
   public void testSamzaJobFailureForSyncTask() {
     List<Double> inputList = Arrays.asList(1.2, 2.3, 3.33, 4.5);
 
-    CollectionStream<Double> input = CollectionStream.of("test", "doubles", inputList);
-    CollectionStream output = CollectionStream.empty("test", "output");
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
 
-    TestRunner.of(MyStreamTestTask.class).addInputStream(input).addOutputStream(output).run(Duration.ofSeconds(1));
+    InMemoryInputDescriptor<Double> imid = isd
+        .getInputDescriptor("doubles", new NoOpSerde<Double>());
+
+    InMemoryOutputDescriptor imod = isd
+        .getOutputDescriptor("output", new NoOpSerde<>());
+
+    TestRunner
+        .of(MyStreamTestTask.class)
+        .addInputStream(imid, inputList)
+        .addOutputStream(imod, 1)
+        .run(Duration.ofSeconds(1));
   }
 
   @Test
@@ -68,50 +90,72 @@ public class StreamTaskIntegrationTest {
     List<Integer> inputList = Arrays.asList(1, 2, 3, 4, 5);
     List<Integer> outputList = Arrays.asList(10, 20, 30, 40, 50);
 
-    CollectionStream<Integer> input = CollectionStream.of("test", "input", inputList);
-    CollectionStream output = CollectionStream.empty("test", "output");
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
+
+    InMemoryInputDescriptor<Integer> imid = isd
+        .getInputDescriptor("input", new NoOpSerde<Integer>());
+
+    InMemoryOutputDescriptor<Integer> imod = isd
+        .getOutputDescriptor("output", new NoOpSerde<Integer>());
 
     TestRunner
         .of(MyStreamTestTask.class)
-        .addInputStream(input)
-        .addOutputStream(output)
+        .addInputStream(imid, inputList)
+        .addOutputStream(imod, 1)
         .addOverrideConfig("job.container.thread.pool.size", "4")
         .run(Duration.ofSeconds(1));
 
-    StreamAssert.containsInOrder(output, outputList, Duration.ofMillis(1000));
+    StreamAssert.containsInOrder(outputList, imod, Duration.ofMillis(1000));
   }
 
   @Test
   public void testSyncTaskWithMultiplePartition() throws Exception {
     Map<Integer, List<KV>> inputPartitionData = new HashMap<>();
     Map<Integer, List<Integer>> expectedOutputPartitionData = new HashMap<>();
-    List<Integer> partition = Arrays.asList(1, 2, 3, 4, 5);
-    List<Integer> outputPartition = partition.stream().map(x -> x * 10).collect(Collectors.toList());
-    for (int i = 0; i < 5; i++) {
-      List<KV> keyedPartition = new ArrayList<>();
-      for (Integer val : partition) {
-        keyedPartition.add(KV.of(i, val));
-      }
-      inputPartitionData.put(i, keyedPartition);
-      expectedOutputPartitionData.put(i, new ArrayList<Integer>(outputPartition));
-    }
+    genData(inputPartitionData, expectedOutputPartitionData);
+
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
+
+    InMemoryInputDescriptor<KV> imid = isd
+        .getInputDescriptor("input", new NoOpSerde<KV>());
 
-    CollectionStream<KV> inputStream = CollectionStream.of("test", "input", inputPartitionData);
-    CollectionStream outputStream = CollectionStream.empty("test", "output", 5);
+    InMemoryOutputDescriptor<Integer> imod = isd
+        .getOutputDescriptor("output", new NoOpSerde<Integer>());
 
     TestRunner
         .of(MyStreamTestTask.class)
-        .addInputStream(inputStream)
-        .addOutputStream(outputStream)
+        .addInputStream(imid, inputPartitionData)
+        .addOutputStream(imod, 5)
         .run(Duration.ofSeconds(2));
 
-    StreamAssert.containsInOrder(outputStream, expectedOutputPartitionData, Duration.ofMillis(1000));
+    StreamAssert.containsInOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000));
   }
 
   @Test
   public void testSyncTaskWithMultiplePartitionMultithreaded() throws Exception {
     Map<Integer, List<KV>> inputPartitionData = new HashMap<>();
     Map<Integer, List<Integer>> expectedOutputPartitionData = new HashMap<>();
+    genData(inputPartitionData, expectedOutputPartitionData);
+
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
+
+    InMemoryInputDescriptor<KV> imid = isd
+        .getInputDescriptor("input", new NoOpSerde<KV>());
+
+    InMemoryOutputDescriptor<Integer> imod = isd
+        .getOutputDescriptor("output", new NoOpSerde<Integer>());
+
+    TestRunner
+        .of(MyStreamTestTask.class)
+        .addInputStream(imid, inputPartitionData)
+        .addOutputStream(imod, 5)
+        .addOverrideConfig("job.container.thread.pool.size", "4")
+        .run(Duration.ofSeconds(2));
+
+    StreamAssert.containsInOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000));
+  }
+
+  public void genData(Map<Integer, List<KV>> inputPartitionData, Map<Integer, List<Integer>> expectedOutputPartitionData) {
     List<Integer> partition = Arrays.asList(1, 2, 3, 4, 5);
     List<Integer> outputPartition = partition.stream().map(x -> x * 10).collect(Collectors.toList());
     for (int i = 0; i < 5; i++) {
@@ -122,17 +166,5 @@ public class StreamTaskIntegrationTest {
       inputPartitionData.put(i, keyedPartition);
       expectedOutputPartitionData.put(i, new ArrayList<Integer>(outputPartition));
     }
-
-    CollectionStream<KV> inputStream = CollectionStream.of("test", "input", inputPartitionData);
-    CollectionStream outputStream = CollectionStream.empty("test", "output", 5);
-
-    TestRunner
-        .of(MyStreamTestTask.class)
-        .addInputStream(inputStream)
-        .addOutputStream(outputStream)
-        .addOverrideConfig("job.container.thread.pool.size", "4")
-        .run(Duration.ofSeconds(2));
-
-    StreamAssert.containsInOrder(outputStream, expectedOutputPartitionData, Duration.ofMillis(1000));
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
index 5c067ad..adcea48 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplicationDescriptor;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.JobConfig;
@@ -44,7 +45,9 @@ import org.apache.samza.storage.kv.inmemory.InMemoryTableDescriptor;
 import org.apache.samza.system.kafka.KafkaSystemDescriptor;
 import org.apache.samza.table.Table;
 import org.apache.samza.test.framework.TestRunner;
-import org.apache.samza.test.framework.stream.CollectionStream;
+import org.apache.samza.test.framework.system.InMemoryInputDescriptor;
+import org.apache.samza.test.framework.system.InMemoryOutputDescriptor;
+import org.apache.samza.test.framework.system.InMemorySystemDescriptor;
 import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
 import org.junit.Test;
 
@@ -84,25 +87,28 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
     configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), ENRICHED_PAGEVIEW_STREAM), systemName);
     configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), systemName);
 
-    CollectionStream<PageView> pageViewStream =
-        CollectionStream.of(systemName, PAGEVIEW_STREAM, pageViews);
-    CollectionStream<Profile> profileStream =
-        CollectionStream.of(systemName, PROFILE_STREAM, profiles);
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor(systemName);
 
-    CollectionStream<EnrichedPageView> outputStream =
-        CollectionStream.empty(systemName, ENRICHED_PAGEVIEW_STREAM);
+    InMemoryInputDescriptor<PageView> pageViewStreamDesc = isd
+        .getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<PageView>());
+
+    InMemoryInputDescriptor<Profile> profileStreamDesc = isd
+        .getInputDescriptor(PROFILE_STREAM, new NoOpSerde<Profile>());
+
+    InMemoryOutputDescriptor<EnrichedPageView> outputStreamDesc = isd
+        .getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<EnrichedPageView>());
 
     TestRunner
         .of(app)
-        .addInputStream(pageViewStream)
-        .addInputStream(profileStream)
-        .addOutputStream(outputStream)
+        .addInputStream(pageViewStreamDesc, pageViews)
+        .addInputStream(profileStreamDesc, profiles)
+        .addOutputStream(outputStreamDesc, 1)
         .addConfigs(new MapConfig(configs))
         .addOverrideConfig(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, Boolean.FALSE.toString())
         .run(Duration.ofMillis(100000));
 
     try {
-      Map<Integer, List<EnrichedPageView>> result = TestRunner.consumeStream(outputStream, Duration.ofMillis(1000));
+      Map<Integer, List<EnrichedPageView>> result = TestRunner.consumeStream(outputStreamDesc, Duration.ofMillis(1000));
       List<EnrichedPageView> results = result.values().stream()
           .flatMap(List::stream)
           .collect(Collectors.toList());
@@ -117,7 +123,7 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
       assertEquals("Mismatch between the expected and actual join count", results.size(),
           expectedEnrichedPageviews.size());
       assertTrue("Pageview profile join did not succeed for all inputs", successfulJoin);
-    } catch (InterruptedException e) {
+    } catch (SamzaException e) {
       e.printStackTrace();
     }
   }
@@ -163,4 +169,4 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
             });
     }
   }
-}
+}
\ No newline at end of file