You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2021/07/01 22:21:50 UTC

[samza] branch master updated: SAMZA-2303: Exclude side inputs when handling end-of-stream and watermarks for high-level (#1506)

This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c11067  SAMZA-2303: Exclude side inputs when handling end-of-stream and watermarks for high-level (#1506)
4c11067 is described below

commit 4c11067b0e2d7c6cb1f9c472c0843eea861a632e
Author: Cameron Lee <ca...@linkedin.com>
AuthorDate: Thu Jul 1 15:21:43 2021 -0700

    SAMZA-2303: Exclude side inputs when handling end-of-stream and watermarks for high-level (#1506)
    
    Symptom: End-of-stream and watermarks are not properly propagated through Samza when side inputs are used.
    This prevents many tests from using the TestRunner framework, since the TestRunner framework relies on having tasks shut themselves down based on end-of-stream messages. Being able to use TestRunner is helpful because it significantly decreases test times.
    
    Cause: OperatorImplGraph builds EndOfStreamStates and WatermarkStates objects with all of the input SSPs from the job model. That includes side-input SSPs. However, high-level operator tasks aren't given messages from side-input SSPs, so high-level operators should not need to include handling for end-of-stream and watermarks. The result of this issue is that end-of-stream and watermark handling tries to include side-inputs but never updates those states, which can result in not exiti [...]
    
    Changes:
    1. Pass set of SSPs excluding side-inputs to high-level operators so that they don't read directly from the task model which does have side-inputs. High-level operators will then handle end-of-stream and watermark propagation without considering side-input SSPs.
    2. Change InMemoryManager to only use IncomingMessageEnvelope.END_OF_STREAM_OFFSET when the taskName in the EndOfStreamMessage is null. This prevents the issue with SAMZA-2300 which causes end-of-stream messages to not get properly get aggregated and then broadcast to all partitions (see SAMZA-2300 for more details). Some existing tests would fail without this change.
    3. Add unique app.id in TestRunner for each test. This helps prevents clashes between different tests. For example, ControlMessageSender has a static cache keyed by stream id of intermediate streams, and multiple tests could end up using the same key in that cache. By using a unique app id, the intermediate streams are unique, so multiple tests won't use the same key in the cache.
    
    API changes (impacts testing framework only):
    1. The default app.id used for tests executed by TestRunner is set to the "in-memory scope", which is a string that is randomly generated for each test. Before this change, the app.id was not set.
    2. InMemoryManager only uses IncomingMessageEnvelope.END_OF_STREAM_OFFSET when the EndOfStreamMessage has a null taskName. Before this change, InMemoryManager used IncomingMessageEnvelope.END_OF_STREAM_OFFSET for all EndOfStreamMessages.
    
    Upgrade/usage instructions:
    1. If tests are written using TestRunner, and those tests rely on app.id being unset, then those will need to be updated to use/read the new app.id. It isn't expected to be a common use case that tests rely on app.id.
    2. If the in-memory system is being used (which includes tests written using TestRunner), and it is expected that the in-memory system sets END_OF_STREAM_OFFSET for messages when the taskName is non-null, then that usage will need to be removed. The taskName is intended for use by intermediate streams, so it shouldn't be used outside of Samza internals anyways.
---
 .../apache/samza/context/InternalTaskContext.java  |  25 ++-
 .../org/apache/samza/context/TaskContextImpl.java  |  27 ++-
 .../samza/operators/impl/OperatorImplGraph.java    |  14 +-
 .../samza/system/inmemory/InMemoryManager.java     |  20 +-
 .../org/apache/samza/container/TaskInstance.scala  |   2 +-
 .../apache/samza/context/TestTaskContextImpl.java  |   2 +-
 .../samza/operators/impl/TestWindowOperator.java   |   7 +-
 .../samza/system/inmemory/TestInMemorySystem.java  |  20 +-
 .../apache/samza/test/framework/TestRunner.java    |   8 +
 .../samza/test/table/TestLocalTableEndToEnd.java   | 222 ++++++++++-----------
 .../TestLocalTableWithSideInputsEndToEnd.java      | 188 +++++++++++------
 .../org/apache/samza/test/table/TestTableData.java |  63 +++++-
 12 files changed, 393 insertions(+), 205 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/context/InternalTaskContext.java b/samza-core/src/main/java/org/apache/samza/context/InternalTaskContext.java
index 3d3a53d..804ef93 100644
--- a/samza-core/src/main/java/org/apache/samza/context/InternalTaskContext.java
+++ b/samza-core/src/main/java/org/apache/samza/context/InternalTaskContext.java
@@ -21,12 +21,24 @@ package org.apache.samza.context;
 
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStreamPartition;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
-public class InternalTaskContext {
 
+/**
+ * This class is used for passing objects around for the implementation of the high-level API.
+ * 1) Container for objects that need to be passed between different components in the implementation of the high-level
+ * API.
+ * 2) The implementation of the high-level API is built on top of the low-level API. The low-level API only exposes
+ * {@link TaskContext}, but the implementation of the high-level API needs some other internal Samza components (e.g.
+ * {@link StreamMetadataCache}. We internally make these components available through {@link TaskContextImpl} so that we
+ * can do a cast to access the components. This class hides some of the messiness of casting. It's still not ideal to
+ * need to do any casting, even in this class.
+ */
+public class InternalTaskContext {
   private final Context context;
   private final Map<String, Object> objectRegistry = new HashMap<>();
 
@@ -46,6 +58,10 @@ public class InternalTaskContext {
     return context;
   }
 
+  /**
+   * TODO: The public {@link JobContext} exposes {@link JobModel} now, so can this internal method be replaced by the
+   * public API?
+   */
   public JobModel getJobModel() {
     return ((TaskContextImpl) this.context.getTaskContext()).getJobModel();
   }
@@ -53,4 +69,11 @@ public class InternalTaskContext {
   public StreamMetadataCache getStreamMetadataCache() {
     return ((TaskContextImpl) this.context.getTaskContext()).getStreamMetadataCache();
   }
+
+  /**
+   * See {@link TaskContextImpl#getSspsExcludingSideInputs()}.
+   */
+  public Set<SystemStreamPartition> getSspsExcludingSideInputs() {
+    return ((TaskContextImpl) this.context.getTaskContext()).getSspsExcludingSideInputs();
+  }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java b/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
index edec17d..d87a5bc 100644
--- a/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
@@ -29,9 +29,15 @@ import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.TableManager;
 
+import java.util.Set;
 import java.util.function.Function;
 
 
+/**
+ * This class provides the implementation for the public {@link TaskContext} interface.
+ * It also allows us to pass certain internal Samza components around so that the implementation of the high-level API
+ * can use them (see InternalTaskContext for some more details).
+ */
 public class TaskContextImpl implements TaskContext {
   private final TaskModel taskModel;
   private final MetricsRegistry taskMetricsRegistry;
@@ -39,8 +45,13 @@ public class TaskContextImpl implements TaskContext {
   private final TableManager tableManager;
   private final CallbackScheduler callbackScheduler;
   private final OffsetManager offsetManager;
+
+  // The instance variables below are not used for implementing any public API methods. They are here so that we can
+  // pass some internal components over to the implementation of the high-level API. See InternalTaskContext.
+
   private final JobModel jobModel;
   private final StreamMetadataCache streamMetadataCache;
+  private final Set<SystemStreamPartition> sspsExcludingSideInputs;
 
   public TaskContextImpl(TaskModel taskModel,
       MetricsRegistry taskMetricsRegistry,
@@ -49,7 +60,8 @@ public class TaskContextImpl implements TaskContext {
       CallbackScheduler callbackScheduler,
       OffsetManager offsetManager,
       JobModel jobModel,
-      StreamMetadataCache streamMetadataCache) {
+      StreamMetadataCache streamMetadataCache,
+      Set<SystemStreamPartition> sspsExcludingSideInputs) {
     this.taskModel = taskModel;
     this.taskMetricsRegistry = taskMetricsRegistry;
     this.keyValueStoreProvider = keyValueStoreProvider;
@@ -58,6 +70,7 @@ public class TaskContextImpl implements TaskContext {
     this.offsetManager = offsetManager;
     this.jobModel = jobModel;
     this.streamMetadataCache = streamMetadataCache;
+    this.sspsExcludingSideInputs = sspsExcludingSideInputs;
   }
 
   @Override
@@ -101,4 +114,14 @@ public class TaskContextImpl implements TaskContext {
   public StreamMetadataCache getStreamMetadataCache() {
     return this.streamMetadataCache;
   }
-}
+
+  /**
+   * Returns the {@link SystemStreamPartition}s excluding the side-input SSPs. For the high-level API, watermarks and
+   * end-of-stream messages are propagated based on their input SSPs. However, the Samza framework does not give side
+   * input messages to the high-level operator tasks. Therefore, the operators need to know the input SSPs excluding the
+   * side input SSPs. See SAMZA-2303 for more details.
+   */
+  public Set<SystemStreamPartition> getSspsExcludingSideInputs() {
+    return this.sspsExcludingSideInputs;
+  }
+}
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index 705f0cb..19cec80 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -111,17 +111,13 @@ public class OperatorImplGraph {
       LOG.info("{} has {} producer tasks.", stream, count);
     });
 
-    // set states for end-of-stream
+    // set states for end-of-stream; don't include side inputs (see SAMZA-2303)
     internalTaskContext.registerObject(EndOfStreamStates.class.getName(),
-        new EndOfStreamStates(
-                internalTaskContext.getContext().getTaskContext().getTaskModel().getSystemStreamPartitions(),
-                producerTaskCounts));
-    // set states for watermark
+        new EndOfStreamStates(internalTaskContext.getSspsExcludingSideInputs(), producerTaskCounts));
+    // set states for watermark; don't include side inputs (see SAMZA-2303)
     internalTaskContext.registerObject(WatermarkStates.class.getName(),
-        new WatermarkStates(
-                internalTaskContext.getContext().getTaskContext().getTaskModel().getSystemStreamPartitions(),
-                producerTaskCounts,
-                context.getContainerContext().getContainerMetricsRegistry()));
+        new WatermarkStates(internalTaskContext.getSspsExcludingSideInputs(), producerTaskCounts,
+            context.getContainerContext().getContainerMetricsRegistry()));
 
     specGraph.getInputOperators().forEach((streamId, inputOpSpec) -> {
       SystemStream systemStream = streamConfig.streamIdToSystemStream(streamId);
diff --git a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
index 13ebf6e..4d44a86 100644
--- a/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
+++ b/samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
@@ -68,7 +68,7 @@ class InMemoryManager {
     List<IncomingMessageEnvelope> messages = bufferedMessages.get(ssp);
     String offset = String.valueOf(messages.size());
 
-    if (message instanceof EndOfStreamMessage) {
+    if (shouldUseEndOfStreamOffset(message)) {
       offset = IncomingMessageEnvelope.END_OF_STREAM_OFFSET;
     }
 
@@ -224,4 +224,22 @@ class InMemoryManager {
 
     return ImmutableList.copyOf(messageEnvelopesForSSP.subList(startingOffset, messageEnvelopesForSSP.size()));
   }
+
+  /**
+   * We don't always want to use {@link IncomingMessageEnvelope#END_OF_STREAM_OFFSET} for all
+   * {@link EndOfStreamMessage}s. Certain control message flows (e.g. end-of-stream) have an aggregation partition,
+   * which needs to listen for messages from all other partitions. These aggregation messages are marked by the task
+   * name being non-null. If we use {@link IncomingMessageEnvelope#END_OF_STREAM_OFFSET} for the aggregation messages,
+   * then the aggregation partition would stop listening once it got the message from one of the tasks, but that means
+   * it would miss the aggregation messages from all other tasks. See SAMZA-2300 for more details.
+   * One other note: If there is a serializer set for the stream, then by the time the message gets to this check, it
+   * will be a byte array, so this check will not return true, even if the deserialized message was an
+   * {@link EndOfStreamMessage}. So far this isn't a problem, because we only really need this to return true for
+   * input streams (not intermediate streams), and in-memory input stream data doesn't get serialized. For intermediate
+   * streams, we don't need END_OF_STREAM_OFFSET to be used since the high-level operators take care of end-of-stream
+   * messages based on MessageType.
+   */
+  private static boolean shouldUseEndOfStreamOffset(Object message) {
+    return (message instanceof EndOfStreamMessage) && ((EndOfStreamMessage) message).getTaskName() == null;
+  }
 }
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 2ebe465..801c6bc 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -80,7 +80,7 @@ class TaskInstance(
       }
     })
   private val taskContext = new TaskContextImpl(taskModel, metrics.registry, kvStoreSupplier, tableManager,
-    new CallbackSchedulerImpl(epochTimeScheduler), offsetManager, jobModel, streamMetadataCache)
+    new CallbackSchedulerImpl(epochTimeScheduler), offsetManager, jobModel, streamMetadataCache, systemStreamPartitions)
   // need separate field for this instead of using it through Context, since Context throws an exception if it is null
   private val applicationTaskContextOption = applicationTaskContextFactoryOption
     .map(_.create(externalContextOption.orNull, jobContext, containerContext, taskContext,
diff --git a/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java b/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java
index 0e8f78e..094583e 100644
--- a/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java
@@ -62,7 +62,7 @@ public class TestTaskContextImpl {
     MockitoAnnotations.initMocks(this);
     taskContext =
         new TaskContextImpl(taskModel, taskMetricsRegistry, keyValueStoreProvider, tableManager, callbackScheduler,
-            offsetManager, null, null);
+            offsetManager, null, null, null);
     when(this.taskModel.getTaskName()).thenReturn(TASK_NAME);
   }
 
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
index 76b79a7..8218720 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
@@ -36,6 +36,7 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.context.Context;
 import org.apache.samza.context.MockContext;
+import org.apache.samza.context.TaskContextImpl;
 import org.apache.samza.system.descriptors.GenericInputDescriptor;
 import org.apache.samza.system.descriptors.GenericSystemDescriptor;
 import org.apache.samza.job.model.TaskModel;
@@ -93,11 +94,13 @@ public class TestWindowOperator {
     Serde storeKeySerde = new TimeSeriesKeySerde(new IntegerSerde());
     Serde storeValSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde());
 
+    SystemStreamPartition ssp = new SystemStreamPartition("kafka", "integers", new Partition(0));
     TaskModel taskModel = mock(TaskModel.class);
-    when(taskModel.getSystemStreamPartitions()).thenReturn(ImmutableSet
-        .of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
+    when(taskModel.getSystemStreamPartitions()).thenReturn(ImmutableSet.of(ssp));
     when(taskModel.getTaskName()).thenReturn(new TaskName("task 1"));
     when(this.context.getTaskContext().getTaskModel()).thenReturn(taskModel);
+    when(((TaskContextImpl) this.context.getTaskContext()).getSspsExcludingSideInputs()).thenReturn(
+        ImmutableSet.of(ssp));
     when(this.context.getTaskContext().getTaskMetricsRegistry()).thenReturn(new MetricsRegistryMap());
     when(this.context.getContainerContext().getContainerMetricsRegistry()).thenReturn(new MetricsRegistryMap());
     when(this.context.getTaskContext().getStore("jobName-jobId-window-w1"))
diff --git a/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java
index 0a2e221..9439b01 100644
--- a/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java
+++ b/samza-core/src/test/java/org/apache/samza/system/inmemory/TestInMemorySystem.java
@@ -127,7 +127,7 @@ public class TestInMemorySystem {
   }
 
   @Test
-  public void testEndOfStreamMessage() {
+  public void testEndOfStreamMessageWithTask() {
     EndOfStreamMessage eos = new EndOfStreamMessage("test-task");
 
     produceMessages(eos);
@@ -139,6 +139,24 @@ public class TestInMemorySystem {
     List<IncomingMessageEnvelope> results = consumeRawMessages(sspsToPoll);
 
     assertEquals(1, results.size());
+    assertEquals("test-task", ((EndOfStreamMessage) results.get(0).getMessage()).getTaskName());
+    assertFalse(results.get(0).isEndOfStream());
+  }
+
+  @Test
+  public void testEndOfStreamMessageWithoutTask() {
+    EndOfStreamMessage eos = new EndOfStreamMessage();
+
+    produceMessages(eos);
+
+    Set<SystemStreamPartition> sspsToPoll = IntStream.range(0, PARTITION_COUNT)
+        .mapToObj(partition -> new SystemStreamPartition(SYSTEM_STREAM, new Partition(partition)))
+        .collect(Collectors.toSet());
+
+    List<IncomingMessageEnvelope> results = consumeRawMessages(sspsToPoll);
+
+    assertEquals(1, results.size());
+    assertNull(((EndOfStreamMessage) results.get(0).getMessage()).getTaskName());
     assertTrue(results.get(0).isEndOfStream());
   }
 
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index fb65eea..943a8ca 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -107,6 +107,14 @@ public class TestRunner {
     this.configs = new HashMap<>();
     this.inMemoryScope = RandomStringUtils.random(10, true, true);
     configs.put(ApplicationConfig.APP_NAME, APP_NAME);
+    /*
+     * Use a unique app id to help make sure a test execution is isolated from others.
+     * A concrete example of where this helps is to avoid an issue with ControlMessageSender. It has a static cache
+     * keyed by stream id to save partition counts for intermediate streams. This means that different tests can
+     * collide in this cache if they use the same intermediate stream names. Having a unique app id makes the
+     * intermediate streams unique across tests.
+     */
+    configs.put(ApplicationConfig.APP_ID, this.inMemoryScope);
     configs.put(JobConfig.PROCESSOR_ID, "1");
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
     configs.put(JobConfig.STARTPOINT_METADATA_STORE_FACTORY, InMemoryMetadataStoreFactory.class.getCanonicalName());
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
index 78fc7b5..ea81f13 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.test.table;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -27,10 +28,8 @@ import java.util.Map;
 
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
-import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
 import org.apache.samza.context.Context;
@@ -39,7 +38,6 @@ import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
 import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
@@ -47,11 +45,10 @@ import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
 import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.Table;
-import org.apache.samza.test.harness.IntegrationTestHarness;
+import org.apache.samza.test.framework.TestRunner;
+import org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
+import org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
 import org.apache.samza.test.util.ArraySystemFactory;
-import org.apache.samza.test.util.Base64Serializer;
-
-import org.junit.Assert;
 import org.junit.Test;
 
 import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
@@ -62,50 +59,42 @@ import static org.apache.samza.test.table.TestTableData.Profile;
 import static org.apache.samza.test.table.TestTableData.ProfileJsonSerde;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
 
 
 /**
  * This test class tests sendTo() and join() for local tables
  */
-public class TestLocalTableEndToEnd extends IntegrationTestHarness {
+public class TestLocalTableEndToEnd {
+  private static final String SYSTEM_NAME = "test";
+  private static final String PAGEVIEW_STREAM = "pageview";
+  private static final String PROFILE_STREAM = "profile";
 
   @Test
-  public void testSendTo() throws Exception {
-
-    int count = 10;
-    Profile[] profiles = TestTableData.generateProfiles(count);
-
-    int partitionCount = 4;
-    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
-
-    configs.put("streams.Profile.samza.system", "test");
-    configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
-    configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount));
-
+  public void testSendTo() {
     MyMapFunction mapFn = new MyMapFunction();
+    StreamApplication app = appDesc -> {
+      Table<KV<Integer, Profile>> table =
+          appDesc.getTable(new InMemoryTableDescriptor<>("t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
+      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor(SYSTEM_NAME);
+      GenericInputDescriptor<Profile> isd = ksd.getInputDescriptor(PROFILE_STREAM, new NoOpSerde<>());
 
-    final StreamApplication app = appDesc -> {
-
-      Table<KV<Integer, Profile>> table = appDesc.getTable(new InMemoryTableDescriptor("t1",
-          KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
-      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
-      GenericInputDescriptor<Profile> isd = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
-
-      appDesc.getInputStream(isd)
-          .map(mapFn)
-          .sendTo(table);
+      appDesc.getInputStream(isd).map(mapFn).sendTo(table);
     };
 
-    Config config = new MapConfig(configs);
-    final LocalApplicationRunner runner = new LocalApplicationRunner(app, config);
-    executeRun(runner, config);
-    runner.waitForFinish();
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor(SYSTEM_NAME);
+    InMemoryInputDescriptor<Profile> profileStreamDesc = isd.getInputDescriptor(PROFILE_STREAM, new NoOpSerde<>());
+
+    int numProfilesPerPartition = 10;
+    int numInputPartitions = 4;
+    Map<Integer, List<Profile>> inputProfiles =
+        TestTableData.generatePartitionedProfiles(numProfilesPerPartition * numInputPartitions, numInputPartitions);
+    TestRunner.of(app).addInputStream(profileStreamDesc, inputProfiles).run(Duration.ofSeconds(10));
 
-    for (int i = 0; i < partitionCount; i++) {
+    for (int i = 0; i < numInputPartitions; i++) {
       MyMapFunction mapFnCopy = MyMapFunction.getMapFunctionByTask(String.format("Partition %d", i));
-      assertEquals(count, mapFnCopy.received.size());
-      mapFnCopy.received.forEach(p -> Assert.assertTrue(mapFnCopy.table.get(p.getMemberId()) != null));
+      assertEquals(numProfilesPerPartition, mapFnCopy.received.size());
+      mapFnCopy.received.forEach(p -> assertNotNull(mapFnCopy.table.get(p.getMemberId())));
     }
   }
 
@@ -116,52 +105,49 @@ public class TestLocalTableEndToEnd extends IntegrationTestHarness {
     @Override
     public void describe(StreamApplicationDescriptor appDesc) {
       Table<KV<Integer, Profile>> table = appDesc.getTable(
-          new InMemoryTableDescriptor("t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
-      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
-      GenericInputDescriptor<Profile> profileISD = ksd.getInputDescriptor("Profile", new NoOpSerde<>());
+          new InMemoryTableDescriptor<>("t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
+      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor(SYSTEM_NAME);
+      GenericInputDescriptor<Profile> profileISD = ksd.getInputDescriptor(PROFILE_STREAM, new NoOpSerde<>());
+      profileISD.shouldBootstrap();
       appDesc.getInputStream(profileISD)
-          .map(m -> new KV(m.getMemberId(), m))
+          .map(m -> new KV<>(m.getMemberId(), m))
           .sendTo(table);
 
-      GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor("PageView", new NoOpSerde<>());
+      GenericInputDescriptor<PageView> pageViewISD = ksd.getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<>());
       appDesc.getInputStream(pageViewISD)
           .map(pv -> {
             received.add(pv);
             return pv;
           })
-          .partitionBy(PageView::getMemberId, v -> v, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "p1")
+          .partitionBy(PageView::getMemberId, v -> v, KVSerde.of(new IntegerSerde(), new PageViewJsonSerde()), "p1")
           .join(table, new PageViewToProfileJoinFunction())
           .sink((m, collector, coordinator) -> joined.add(m));
     }
   }
 
   @Test
-  public void testStreamTableJoin() throws Exception {
-
-    int count = 10;
-    PageView[] pageViews = TestTableData.generatePageViews(count);
-    Profile[] profiles = TestTableData.generateProfiles(count);
-
+  public void testStreamTableJoin() {
+    int totalPageViews = 40;
     int partitionCount = 4;
-    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
-
-    configs.put("streams.PageView.samza.system", "test");
-    configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews));
-    configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount));
-
-    configs.put("streams.Profile.samza.system", "test");
-    configs.put("streams.Profile.samza.bootstrap", "true");
-    configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
-    configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount));
-
-    Config config = new MapConfig(configs);
-    final LocalApplicationRunner runner = new LocalApplicationRunner(new StreamTableJoinApp(), config);
-    executeRun(runner, config);
-    runner.waitForFinish();
-
-    assertEquals(count * partitionCount, StreamTableJoinApp.received.size());
-    assertEquals(count * partitionCount, StreamTableJoinApp.joined.size());
-    assertTrue(StreamTableJoinApp.joined.get(0) instanceof EnrichedPageView);
+    Map<Integer, List<PageView>> inputPageViews =
+        TestTableData.generatePartitionedPageViews(totalPageViews, partitionCount);
+    // 10 is the max member id for page views
+    Map<Integer, List<Profile>> inputProfiles =
+        TestTableData.generatePartitionedProfiles(10, partitionCount);
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor(SYSTEM_NAME);
+    InMemoryInputDescriptor<PageView> pageViewStreamDesc = isd
+        .getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<>());
+    InMemoryInputDescriptor<Profile> profileStreamDesc = isd
+        .getInputDescriptor(PROFILE_STREAM, new NoOpSerde<>());
+
+    TestRunner.of(new StreamTableJoinApp())
+        .addInputStream(pageViewStreamDesc, inputPageViews)
+        .addInputStream(profileStreamDesc, inputProfiles)
+        .run(Duration.ofSeconds(10));
+
+    assertEquals(totalPageViews, StreamTableJoinApp.received.size());
+    assertEquals(totalPageViews, StreamTableJoinApp.joined.size());
+    assertNotNull(StreamTableJoinApp.joined.get(0));
   }
 
   static class DualStreamTableJoinApp implements StreamApplication {
@@ -178,29 +164,31 @@ public class TestLocalTableEndToEnd extends IntegrationTestHarness {
       PageViewToProfileJoinFunction joinFn1 = new PageViewToProfileJoinFunction();
       PageViewToProfileJoinFunction joinFn2 = new PageViewToProfileJoinFunction();
 
-      Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new InMemoryTableDescriptor("t1", profileKVSerde));
+      Table<KV<Integer, Profile>> profileTable = appDesc.getTable(new InMemoryTableDescriptor<>("t1", profileKVSerde));
 
-      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor("test");
-      GenericInputDescriptor<Profile> profileISD1 = ksd.getInputDescriptor("Profile1", new NoOpSerde<>());
-      GenericInputDescriptor<Profile> profileISD2 = ksd.getInputDescriptor("Profile2", new NoOpSerde<>());
+      DelegatingSystemDescriptor ksd = new DelegatingSystemDescriptor(SYSTEM_NAME);
+      GenericInputDescriptor<Profile> profileISD1 = ksd.getInputDescriptor(PROFILE_STREAM + "1", new NoOpSerde<>());
+      profileISD1.shouldBootstrap();
+      GenericInputDescriptor<Profile> profileISD2 = ksd.getInputDescriptor(PROFILE_STREAM + "2", new NoOpSerde<>());
+      profileISD2.shouldBootstrap();
       MessageStream<Profile> profileStream1 = appDesc.getInputStream(profileISD1);
       MessageStream<Profile> profileStream2 = appDesc.getInputStream(profileISD2);
 
       profileStream1
           .map(m -> {
             sentToProfileTable1.add(m);
-            return new KV(m.getMemberId(), m);
+            return new KV<>(m.getMemberId(), m);
           })
           .sendTo(profileTable);
       profileStream2
           .map(m -> {
             sentToProfileTable2.add(m);
-            return new KV(m.getMemberId(), m);
+            return new KV<>(m.getMemberId(), m);
           })
           .sendTo(profileTable);
 
-      GenericInputDescriptor<PageView> pageViewISD1 = ksd.getInputDescriptor("PageView1", new NoOpSerde<PageView>());
-      GenericInputDescriptor<PageView> pageViewISD2 = ksd.getInputDescriptor("PageView2", new NoOpSerde<PageView>());
+      GenericInputDescriptor<PageView> pageViewISD1 = ksd.getInputDescriptor(PAGEVIEW_STREAM + "1", new NoOpSerde<>());
+      GenericInputDescriptor<PageView> pageViewISD2 = ksd.getInputDescriptor(PAGEVIEW_STREAM + "2", new NoOpSerde<>());
       MessageStream<PageView> pageViewStream1 = appDesc.getInputStream(pageViewISD1);
       MessageStream<PageView> pageViewStream2 = appDesc.getInputStream(pageViewISD2);
 
@@ -217,45 +205,40 @@ public class TestLocalTableEndToEnd extends IntegrationTestHarness {
   }
 
   @Test
-  public void testDualStreamTableJoin() throws Exception {
-
-    int count = 10;
-    PageView[] pageViews = TestTableData.generatePageViews(count);
-    Profile[] profiles = TestTableData.generateProfiles(count);
-
+  public void testDualStreamTableJoin() {
+    int totalPageViews = 40;
     int partitionCount = 4;
-    Map<String, String> configs = getBaseJobConfig(bootstrapUrl(), zkConnect());
-
-    configs.put("streams.Profile1.samza.system", "test");
-    configs.put("streams.Profile1.source", Base64Serializer.serialize(profiles));
-    configs.put("streams.Profile1.samza.bootstrap", "true");
-    configs.put("streams.Profile1.partitionCount", String.valueOf(partitionCount));
-
-    configs.put("streams.Profile2.samza.system", "test");
-    configs.put("streams.Profile2.source", Base64Serializer.serialize(profiles));
-    configs.put("streams.Profile2.samza.bootstrap", "true");
-    configs.put("streams.Profile2.partitionCount", String.valueOf(partitionCount));
-
-    configs.put("streams.PageView1.samza.system", "test");
-    configs.put("streams.PageView1.source", Base64Serializer.serialize(pageViews));
-    configs.put("streams.PageView1.partitionCount", String.valueOf(partitionCount));
-
-    configs.put("streams.PageView2.samza.system", "test");
-    configs.put("streams.PageView2.source", Base64Serializer.serialize(pageViews));
-    configs.put("streams.PageView2.partitionCount", String.valueOf(partitionCount));
-
-    Config config = new MapConfig(configs);
-    final LocalApplicationRunner runner = new LocalApplicationRunner(new DualStreamTableJoinApp(), config);
-    executeRun(runner, config);
-    runner.waitForFinish();
-
-    assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable1.size());
-    assertEquals(count * partitionCount, DualStreamTableJoinApp.sentToProfileTable2.size());
-
-    assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews1.size());
-    assertEquals(count * partitionCount, DualStreamTableJoinApp.joinedPageViews2.size());
-    assertTrue(DualStreamTableJoinApp.joinedPageViews1.get(0) instanceof EnrichedPageView);
-    assertTrue(DualStreamTableJoinApp.joinedPageViews2.get(0) instanceof EnrichedPageView);
+    Map<Integer, List<PageView>> inputPageViews1 =
+        TestTableData.generatePartitionedPageViews(totalPageViews, partitionCount);
+    Map<Integer, List<PageView>> inputPageViews2 =
+        TestTableData.generatePartitionedPageViews(totalPageViews, partitionCount);
+    // 10 is the max member id for page views
+    int numProfiles = 10;
+    Map<Integer, List<Profile>> inputProfiles1 = TestTableData.generatePartitionedProfiles(numProfiles, partitionCount);
+    Map<Integer, List<Profile>> inputProfiles2 = TestTableData.generatePartitionedProfiles(numProfiles, partitionCount);
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor(SYSTEM_NAME);
+    InMemoryInputDescriptor<PageView> pageViewStreamDesc1 = isd
+        .getInputDescriptor(PAGEVIEW_STREAM + "1", new NoOpSerde<>());
+    InMemoryInputDescriptor<PageView> pageViewStreamDesc2 = isd
+        .getInputDescriptor(PAGEVIEW_STREAM + "2", new NoOpSerde<>());
+    InMemoryInputDescriptor<Profile> profileStreamDesc1 = isd
+        .getInputDescriptor(PROFILE_STREAM + "1", new NoOpSerde<>());
+    InMemoryInputDescriptor<Profile> profileStreamDesc2 = isd
+        .getInputDescriptor(PROFILE_STREAM + "2", new NoOpSerde<>());
+
+    TestRunner.of(new DualStreamTableJoinApp())
+        .addInputStream(pageViewStreamDesc1, inputPageViews1)
+        .addInputStream(pageViewStreamDesc2, inputPageViews2)
+        .addInputStream(profileStreamDesc1, inputProfiles1)
+        .addInputStream(profileStreamDesc2, inputProfiles2)
+        .run(Duration.ofSeconds(10));
+
+    assertEquals(numProfiles, DualStreamTableJoinApp.sentToProfileTable1.size());
+    assertEquals(numProfiles, DualStreamTableJoinApp.sentToProfileTable2.size());
+    assertEquals(totalPageViews, DualStreamTableJoinApp.joinedPageViews1.size());
+    assertEquals(totalPageViews, DualStreamTableJoinApp.joinedPageViews2.size());
+    assertNotNull(DualStreamTableJoinApp.joinedPageViews1.get(0));
+    assertNotNull(DualStreamTableJoinApp.joinedPageViews2.get(0));
   }
 
   static Map<String, String> getBaseJobConfig(String bootstrapUrl, String zkConnect) {
@@ -283,8 +266,7 @@ public class TestLocalTableEndToEnd extends IntegrationTestHarness {
   }
 
   private static class MyMapFunction implements MapFunction<Profile, KV<Integer, Profile>> {
-
-    private static Map<String, MyMapFunction> taskToMapFunctionMap = new HashMap<>();
+    private static final Map<String, MyMapFunction> TASK_TO_MAP_FUNCTION_MAP = new HashMap<>();
 
     private transient List<Profile> received;
     private transient ReadWriteTable table;
@@ -294,17 +276,17 @@ public class TestLocalTableEndToEnd extends IntegrationTestHarness {
       table = context.getTaskContext().getTable("t1");
       this.received = new ArrayList<>();
 
-      taskToMapFunctionMap.put(context.getTaskContext().getTaskModel().getTaskName().getTaskName(), this);
+      TASK_TO_MAP_FUNCTION_MAP.put(context.getTaskContext().getTaskModel().getTaskName().getTaskName(), this);
     }
 
     @Override
     public KV<Integer, Profile> apply(Profile profile) {
       received.add(profile);
-      return new KV(profile.getMemberId(), profile);
+      return new KV<>(profile.getMemberId(), profile);
     }
 
     public static MyMapFunction getMapFunctionByTask(String taskName) {
-      return taskToMapFunctionMap.get(taskName);
+      return TASK_TO_MAP_FUNCTION_MAP.get(taskName);
     }
   }
 
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputsEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputsEndToEnd.java
index 071f65e..34ac29a 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputsEndToEnd.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputsEndToEnd.java
@@ -19,128 +19,190 @@
 
 package org.apache.samza.test.table;
 
-import com.google.common.collect.ImmutableList;
 import java.time.Duration;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
-import org.apache.samza.SamzaException;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.samza.application.SamzaApplication;
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.StreamConfig;
+import org.apache.samza.application.TaskApplication;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
 import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
+import org.apache.samza.context.Context;
 import org.apache.samza.operators.KV;
-import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
 import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
-import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
+import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.Table;
+import org.apache.samza.table.descriptors.TableDescriptor;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.StreamTaskFactory;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.test.framework.StreamAssert;
 import org.apache.samza.test.framework.TestRunner;
 import org.apache.samza.test.framework.system.descriptors.InMemoryInputDescriptor;
 import org.apache.samza.test.framework.system.descriptors.InMemoryOutputDescriptor;
 import org.apache.samza.test.framework.system.descriptors.InMemorySystemDescriptor;
-import org.apache.samza.test.harness.IntegrationTestHarness;
 import org.junit.Test;
 
 import static org.apache.samza.test.table.TestTableData.EnrichedPageView;
 import static org.apache.samza.test.table.TestTableData.PageView;
 import static org.apache.samza.test.table.TestTableData.Profile;
 import static org.apache.samza.test.table.TestTableData.ProfileJsonSerde;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 
-public class TestLocalTableWithSideInputsEndToEnd extends IntegrationTestHarness {
+public class TestLocalTableWithSideInputsEndToEnd {
+  private static final String SYSTEM_NAME = "test";
   private static final String PAGEVIEW_STREAM = "pageview";
   private static final String PROFILE_STREAM = "profile";
+  private static final String PROFILE_TABLE = "profile-table";
   private static final String ENRICHED_PAGEVIEW_STREAM = "enrichedpageview";
+  private static final SystemStream OUTPUT_SYSTEM_STREAM = new SystemStream(SYSTEM_NAME, ENRICHED_PAGEVIEW_STREAM);
 
   @Test
-  public void testJoinWithSideInputsTable() {
+  public void testLowLevelJoinWithSideInputsTable() throws InterruptedException {
+    int partitionCount = 4;
+    IntegerSerde integerSerde = new IntegerSerde();
+    // for low-level, need to pre-partition the input in the same way that the profiles are partitioned
+    Map<Integer, List<PageView>> pageViewsPartitionedByMemberId =
+        TestTableData.generatePartitionedPageViews(20, partitionCount)
+            .values()
+            .stream()
+            .flatMap(List::stream)
+            .collect(Collectors.groupingBy(
+              pageView -> Math.abs(Arrays.hashCode(integerSerde.toBytes(pageView.getMemberId()))) % partitionCount));
+    runTest(
+        new LowLevelPageViewProfileJoin(),
+        pageViewsPartitionedByMemberId,
+        TestTableData.generatePartitionedProfiles(10, partitionCount));
+  }
+
+  @Test
+  public void testJoinWithSideInputsTable() throws InterruptedException {
     runTest(
-        "test",
         new PageViewProfileJoin(),
-        Arrays.asList(TestTableData.generatePageViews(10)),
-        Arrays.asList(TestTableData.generateProfiles(10)));
+        TestTableData.generatePartitionedPageViews(20, 4),
+        TestTableData.generatePartitionedProfiles(10, 4));
   }
 
   @Test
-  public void testJoinWithDurableSideInputTable() {
+  public void testJoinWithDurableSideInputTable() throws InterruptedException {
     runTest(
-        "test",
         new DurablePageViewProfileJoin(),
-        Arrays.asList(TestTableData.generatePageViews(5)),
-        Arrays.asList(TestTableData.generateProfiles(5)));
+        TestTableData.generatePartitionedPageViews(20, 4),
+        TestTableData.generatePartitionedProfiles(10, 4));
   }
 
-  private void runTest(String systemName, StreamApplication app, List<PageView> pageViews,
-      List<Profile> profiles) {
-    Map<String, String> configs = new HashMap<>();
-    configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, PAGEVIEW_STREAM), systemName);
-    configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, PROFILE_STREAM), systemName);
-    configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID, ENRICHED_PAGEVIEW_STREAM), systemName);
-
-    InMemorySystemDescriptor isd = new InMemorySystemDescriptor(systemName);
-
+  private <T extends ApplicationDescriptor<?>> void runTest(SamzaApplication<T> app,
+      Map<Integer, List<PageView>> pageViews,
+      Map<Integer, List<Profile>> profiles) throws InterruptedException {
+    InMemorySystemDescriptor isd = new InMemorySystemDescriptor(SYSTEM_NAME);
     InMemoryInputDescriptor<PageView> pageViewStreamDesc = isd
-        .getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<PageView>());
-
+        .getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<>());
     InMemoryInputDescriptor<Profile> profileStreamDesc = isd
-        .getInputDescriptor(PROFILE_STREAM, new NoOpSerde<Profile>());
-
+        .getInputDescriptor(PROFILE_STREAM, new NoOpSerde<>());
     InMemoryOutputDescriptor<EnrichedPageView> outputStreamDesc = isd
-        .getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<EnrichedPageView>());
+        .getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<>());
 
-    TestRunner
-        .of(app)
+    TestRunner.of(app)
         .addInputStream(pageViewStreamDesc, pageViews)
         .addInputStream(profileStreamDesc, profiles)
         .addOutputStream(outputStreamDesc, 1)
-        .addConfig(new MapConfig(configs))
-        .run(Duration.ofMillis(100000));
-
-    try {
-      Map<Integer, List<EnrichedPageView>> result = TestRunner.consumeStream(outputStreamDesc, Duration.ofMillis(1000));
-      List<EnrichedPageView> results = result.values().stream()
-          .flatMap(List::stream)
-          .collect(Collectors.toList());
-
-      List<EnrichedPageView> expectedEnrichedPageviews = pageViews.stream()
-          .flatMap(pv -> profiles.stream()
-              .filter(profile -> pv.memberId == profile.memberId)
-              .map(profile -> new EnrichedPageView(pv.pageKey, profile.memberId, profile.company)))
-          .collect(Collectors.toList());
-
-      boolean successfulJoin = results.stream().allMatch(expectedEnrichedPageviews::contains);
-      assertEquals("Mismatch between the expected and actual join count", expectedEnrichedPageviews.size(), results.size());
-      assertTrue("Pageview profile join did not succeed for all inputs", successfulJoin);
-    } catch (SamzaException e) {
-      e.printStackTrace();
+        .run(Duration.ofSeconds(10));
+
+    List<EnrichedPageView> expectedEnrichedPageViews = buildExpectedEnrichedPageViews(pageViews, profiles);
+    StreamAssert.containsInAnyOrder(expectedEnrichedPageViews, outputStreamDesc, Duration.ofSeconds(1));
+  }
+
+  private static List<EnrichedPageView> buildExpectedEnrichedPageViews(Map<Integer, List<PageView>> pageViews,
+      Map<Integer, List<Profile>> profiles) {
+    ImmutableMap.Builder<Integer, Profile> profilesByMemberIdBuilder = new ImmutableMap.Builder<>();
+    profiles.values()
+        .stream()
+        .flatMap(List::stream)
+        .forEach(profile -> profilesByMemberIdBuilder.put(profile.getMemberId(), profile));
+    Map<Integer, Profile> profilesByMemberId = profilesByMemberIdBuilder.build();
+    ImmutableList.Builder<EnrichedPageView> enrichedPageViewsBuilder = new ImmutableList.Builder<>();
+    pageViews.values()
+        .stream()
+        .flatMap(List::stream)
+        .forEach(pageView -> Optional.ofNullable(profilesByMemberId.get(pageView.getMemberId()))
+            .ifPresent(profile -> enrichedPageViewsBuilder.add(
+                new EnrichedPageView(pageView.getPageKey(), profile.getMemberId(), profile.getCompany()))));
+    return enrichedPageViewsBuilder.build();
+  }
+
+  static class LowLevelPageViewProfileJoin implements TaskApplication {
+    @Override
+    public void describe(TaskApplicationDescriptor appDescriptor) {
+      DelegatingSystemDescriptor sd = new DelegatingSystemDescriptor(SYSTEM_NAME);
+      appDescriptor.withInputStream(sd.getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<>()));
+      appDescriptor.withInputStream(sd.getInputDescriptor(PROFILE_STREAM, new NoOpSerde<>()));
+
+      TableDescriptor<Integer, Profile, ?> tableDescriptor = new InMemoryTableDescriptor<>(PROFILE_TABLE,
+          KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())).withSideInputs(ImmutableList.of(PROFILE_STREAM))
+          .withSideInputsProcessor((msg, store) -> {
+            Profile profile = (Profile) msg.getMessage();
+            int key = profile.getMemberId();
+            return ImmutableList.of(new Entry<>(key, profile));
+          });
+      appDescriptor.withTable(tableDescriptor);
+
+      appDescriptor.withOutputStream(sd.getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<>()));
+
+      appDescriptor.withTaskFactory((StreamTaskFactory) PageViewProfileJoinStreamTask::new);
     }
   }
 
-  static class PageViewProfileJoin implements StreamApplication {
-    static final String PROFILE_TABLE = "profile-table";
+  static class PageViewProfileJoinStreamTask implements InitableTask, StreamTask {
+    private ReadWriteTable<Integer, Profile> profileTable;
 
     @Override
+    public void init(Context context) {
+      this.profileTable = context.getTaskContext().getTable(PROFILE_TABLE);
+    }
+
+    @Override
+    public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
+      PageView pageView = (PageView) envelope.getMessage();
+      Profile profile = this.profileTable.get(pageView.getMemberId());
+      if (profile != null) {
+        EnrichedPageView enrichedPageView =
+            new EnrichedPageView(pageView.getPageKey(), profile.getMemberId(), profile.getCompany());
+        collector.send(new OutgoingMessageEnvelope(OUTPUT_SYSTEM_STREAM, enrichedPageView));
+      }
+    }
+  }
+
+  static class PageViewProfileJoin implements StreamApplication {
+    @Override
     public void describe(StreamApplicationDescriptor appDescriptor) {
       Table<KV<Integer, TestTableData.Profile>> table = appDescriptor.getTable(getTableDescriptor());
-      KafkaSystemDescriptor sd =
-          new KafkaSystemDescriptor("test");
+      DelegatingSystemDescriptor sd = new DelegatingSystemDescriptor(SYSTEM_NAME);
       appDescriptor.getInputStream(sd.getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<TestTableData.PageView>()))
-          .partitionBy(TestTableData.PageView::getMemberId, v -> v, KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()), "partition-page-view")
+          .partitionBy(TestTableData.PageView::getMemberId, v -> v,
+              KVSerde.of(new IntegerSerde(), new TestTableData.PageViewJsonSerde()), "partition-page-view")
           .join(table, new PageViewToProfileJoinFunction())
           .sendTo(appDescriptor.getOutputStream(sd.getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<>())));
     }
 
     protected TableDescriptor<Integer, Profile, ?> getTableDescriptor() {
-      return new InMemoryTableDescriptor(PROFILE_TABLE, KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
+      return new InMemoryTableDescriptor<>(PROFILE_TABLE, KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
           .withSideInputs(ImmutableList.of(PROFILE_STREAM))
           .withSideInputsProcessor((msg, store) -> {
             Profile profile = (Profile) msg.getMessage();
@@ -153,7 +215,7 @@ public class TestLocalTableWithSideInputsEndToEnd extends IntegrationTestHarness
   static class DurablePageViewProfileJoin extends PageViewProfileJoin {
     @Override
     protected TableDescriptor<Integer, Profile, ?> getTableDescriptor() {
-      return new RocksDbTableDescriptor(PROFILE_TABLE, KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
+      return new RocksDbTableDescriptor<>(PROFILE_TABLE, KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
           .withSideInputs(ImmutableList.of(PROFILE_STREAM))
           .withSideInputsProcessor((msg, store) -> {
             TestTableData.Profile profile = (TestTableData.Profile) msg.getMessage();
@@ -162,4 +224,4 @@ public class TestLocalTableWithSideInputsEndToEnd extends IntegrationTestHarness
           });
     }
   }
-}
\ No newline at end of file
+}
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java b/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
index 76c56b0..39f9b02 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
@@ -20,19 +20,29 @@
 package org.apache.samza.test.table;
 
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
+import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.SerdeFactory;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.core.type.TypeReference;
 
 
 public class TestTableData {
+  private static final IntegerSerde INTEGER_SERDE = new IntegerSerde();
 
   public static class PageView implements Serializable {
     @JsonProperty("pageKey")
@@ -205,6 +215,35 @@ public class TestTableData {
     return pageviews;
   }
 
+  /**
+   * Create page views and spread out page views with the same member id across different partitions.
+   * Member ids are spread out like this to make sure that partitionBy operators properly repartition the messages.
+   * Member ids are assigned randomly from [0, 10).
+   *
+   * Example
+   * generatePartitionedPageViews(20, 4) will return:
+   * 0 -> page views with member ids [0, 5)
+   * 1 -> page views with member ids [6, 10)
+   * 2 -> page views with member ids [0, 5)
+   * 3 -> page views with member ids [6, 10)
+   */
+  public static Map<Integer, List<PageView>> generatePartitionedPageViews(int numPageViews, int partitionCount) {
+    Preconditions.checkArgument(numPageViews % partitionCount == 0, "partitionCount must divide numPageViews evenly");
+    int numPerPartition = numPageViews / partitionCount;
+    Random random = new Random();
+    ImmutableMap.Builder<Integer, List<PageView>> pageViewsBuilder = new ImmutableMap.Builder<>();
+    for (int i = 0; i < partitionCount; i++) {
+      pageViewsBuilder.put(i, new ArrayList<>());
+    }
+    Map<Integer, List<PageView>> pageViews = pageViewsBuilder.build();
+    for (int i = 0; i < numPageViews; i++) {
+      String pagekey = PAGEKEYS[random.nextInt(PAGEKEYS.length - 1)];
+      int memberId = i % 10;
+      pageViews.get(i / numPerPartition).add(new PageView(pagekey, memberId));
+    }
+    return pageViews;
+  }
+
   static public PageView[] generatePageViewsWithDistinctKeys(int count) {
     Random random = new Random();
     PageView[] pageviews = new PageView[count];
@@ -227,4 +266,20 @@ public class TestTableData {
     return profiles;
   }
 
+  /**
+   * Create profiles and partition them based on the bytes representation of the member id. This uses the bytes
+   * representation for partitioning because this needs to use the same partition function as the InMemorySystemProducer
+   * (which is used in the test framework) so that table joins can be tested.
+   * One profile for each member id in [0, numProfiles) is created.
+   */
+  public static Map<Integer, List<Profile>> generatePartitionedProfiles(int numProfiles, int partitionCount) {
+    Random random = new Random();
+    return IntStream.range(0, numProfiles)
+        .mapToObj(i -> {
+          String company = COMPANIES[random.nextInt(COMPANIES.length - 1)];
+          return new Profile(i, company);
+        })
+        .collect(Collectors.groupingBy(
+          profile -> Math.abs(Arrays.hashCode(INTEGER_SERDE.toBytes(profile.getMemberId()))) % partitionCount));
+  }
 }