You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by xi...@apache.org on 2019/03/29 16:15:43 UTC

[beam] branch master updated: [BEAM-6932] SamzaRunner: migrate to use new Samza 1.1.0 liraries (#8163)

This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 91d59e7  [BEAM-6932] SamzaRunner: migrate to use new Samza 1.1.0 liraries (#8163)
91d59e7 is described below

commit 91d59e78ffec3771a1d646c4e320fff571393829
Author: xinyuiscool <xi...@gmail.com>
AuthorDate: Fri Mar 29 09:15:30 2019 -0700

    [BEAM-6932] SamzaRunner: migrate to use new Samza 1.1.0 liraries (#8163)
---
 runners/samza/build.gradle                         |   2 +-
 .../beam/runners/samza/SamzaExecutionContext.java  | 122 ++++++++++++++-
 .../beam/runners/samza/SamzaPipelineResult.java    |   4 +-
 .../org/apache/beam/runners/samza/SamzaRunner.java | 167 ++-------------------
 .../samza/container/BeamContainerRunner.java       |  45 +++++-
 .../apache/beam/runners/samza/runtime/DoFnOp.java  |  14 +-
 .../beam/runners/samza/runtime/GroupByKeyOp.java   |  15 +-
 .../org/apache/beam/runners/samza/runtime/Op.java  |   8 +-
 .../beam/runners/samza/runtime/OpAdapter.java      |  24 +--
 .../beam/runners/samza/runtime/OpMessage.java      |   5 +-
 .../samza/runtime/SamzaDoFnInvokerRegistrar.java   |   4 +-
 .../runners/samza/runtime/SamzaDoFnRunners.java    |  12 +-
 .../samza/runtime/SamzaStoreStateInternals.java    |   2 +-
 .../samza/runtime/SamzaTimerInternalsFactory.java  |  12 +-
 .../runners/samza/translation/ConfigBuilder.java   |   7 +-
 .../translation/FlattenPCollectionsTranslator.java |  14 +-
 .../samza/translation/GroupByKeyTranslator.java    |  27 +++-
 .../samza/translation/ImpulseTranslator.java       |  30 ++--
 .../translation/ParDoBoundMultiTranslator.java     |  18 ++-
 .../translation/PortableTranslationContext.java    |  88 ++++++++---
 .../runners/samza/translation/ReadTranslator.java  |  60 ++++----
 .../translation/SamzaPublishViewTranslator.java    |  13 +-
 .../samza/translation/TranslationContext.java      | 133 ++++++++++------
 .../samza/adapter/BoundedSourceSystemTest.java     |   7 +-
 .../samza/adapter/UnboundedSourceSystemTest.java   |  15 +-
 .../runtime/SamzaStoreStateInternalsTest.java      |   8 +-
 .../runtime/SamzaTimerInternalsFactoryTest.java    |  13 +-
 27 files changed, 513 insertions(+), 356 deletions(-)

diff --git a/runners/samza/build.gradle b/runners/samza/build.gradle
index faa4d51..98a5686 100644
--- a/runners/samza/build.gradle
+++ b/runners/samza/build.gradle
@@ -34,7 +34,7 @@ configurations {
   validatesRunner
 }
 
-def samza_version = "0.14.1"
+def samza_version = "1.1.0"
 
 dependencies {
   shadow library.java.vendored_guava_20_0
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java
index 162c7d8..af65135 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaExecutionContext.java
@@ -17,13 +17,57 @@
  */
 package org.apache.beam.runners.samza;
 
+import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.ServerFactory;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool;
+import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
 import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
+import org.apache.beam.runners.fnexecution.control.MapControlClientPool;
+import org.apache.beam.runners.fnexecution.control.SingleEnvironmentInstanceJobBundleFactory;
+import org.apache.beam.runners.fnexecution.data.GrpcDataService;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
 import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.apache.beam.sdk.fn.IdGenerators;
+import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
+import org.apache.samza.context.ApplicationContainerContext;
+import org.apache.samza.context.ApplicationContainerContextFactory;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.ExternalContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Runtime context for the Samza runner. */
-public class SamzaExecutionContext {
+public class SamzaExecutionContext implements ApplicationContainerContext {
+  private static final Logger LOG = LoggerFactory.getLogger(SamzaExecutionContext.class);
+  private static final String SAMZA_WORKER_ID = "samza_py_worker_id";
+
+  private final SamzaPipelineOptions options;
   private SamzaMetricsContainer metricsContainer;
   private JobBundleFactory jobBundleFactory;
+  private GrpcFnServer<FnApiControlClientPoolService> fnControlServer;
+  private GrpcFnServer<GrpcDataService> fnDataServer;
+  private GrpcFnServer<GrpcStateService> fnStateServer;
+  private ControlClientPool controlClientPool;
+  private IdGenerator idGenerator = IdGenerators.incrementingLongs();
+
+  public SamzaExecutionContext(SamzaPipelineOptions options) {
+    this.options = options;
+  }
+
+  public SamzaPipelineOptions getPipelineOptions() {
+    return options;
+  }
 
   public SamzaMetricsContainer getMetricsContainer() {
     return this.metricsContainer;
@@ -40,4 +84,80 @@ public class SamzaExecutionContext {
   void setJobBundleFactory(JobBundleFactory jobBundleFactory) {
     this.jobBundleFactory = jobBundleFactory;
   }
+
+  @Override
+  public void start() {
+    checkState(getJobBundleFactory() == null, "jobBundleFactory has been created!");
+
+    if (SamzaRunnerOverrideConfigs.isPortableMode(options)) {
+      try {
+        controlClientPool = MapControlClientPool.create();
+        final ExecutorService dataExecutor = Executors.newCachedThreadPool();
+
+        fnControlServer =
+            GrpcFnServer.allocatePortAndCreateFor(
+                FnApiControlClientPoolService.offeringClientsToPool(
+                    controlClientPool.getSink(), () -> SAMZA_WORKER_ID),
+                ServerFactory.createWithPortSupplier(
+                    () -> SamzaRunnerOverrideConfigs.getFnControlPort(options)));
+
+        fnDataServer =
+            GrpcFnServer.allocatePortAndCreateFor(
+                GrpcDataService.create(dataExecutor, OutboundObserverFactory.serverDirect()),
+                ServerFactory.createDefault());
+
+        fnStateServer =
+            GrpcFnServer.allocatePortAndCreateFor(
+                GrpcStateService.create(), ServerFactory.createDefault());
+
+        final long waitTimeoutMs =
+            SamzaRunnerOverrideConfigs.getControlClientWaitTimeoutMs(options);
+        final InstructionRequestHandler instructionHandler =
+            controlClientPool.getSource().take(SAMZA_WORKER_ID, Duration.ofMillis(waitTimeoutMs));
+        final EnvironmentFactory environmentFactory =
+            environment -> RemoteEnvironment.forHandler(environment, instructionHandler);
+        // TODO: use JobBundleFactoryBase.WrappedSdkHarnessClient.wrapping
+        jobBundleFactory =
+            SingleEnvironmentInstanceJobBundleFactory.create(
+                environmentFactory, fnDataServer, fnStateServer, idGenerator);
+      } catch (Exception e) {
+        throw new RuntimeException(
+            "Running samza in Beam portable mode but failed to create job bundle factory", e);
+      }
+
+      setJobBundleFactory(jobBundleFactory);
+    }
+  }
+
+  @Override
+  public void stop() {
+    closeFnServer(fnControlServer);
+    fnControlServer = null;
+    closeFnServer(fnDataServer);
+    fnDataServer = null;
+    closeFnServer(fnStateServer);
+    fnStateServer = null;
+  }
+
+  private void closeFnServer(GrpcFnServer<?> fnServer) {
+    try (AutoCloseable closer = fnServer) {
+      // do nothing
+    } catch (Exception e) {
+      LOG.error("Failed to close fn api servers. Ignore since this is shutdown process...", e);
+    }
+  }
+
+  /** The factory to return this {@link SamzaExecutionContext}. */
+  public class Factory implements ApplicationContainerContextFactory<SamzaExecutionContext> {
+
+    @Override
+    public SamzaExecutionContext create(
+        ExternalContext externalContext, JobContext jobContext, ContainerContext containerContext) {
+
+      final MetricsRegistryMap metricsRegistry =
+          (MetricsRegistryMap) containerContext.getContainerMetricsRegistry();
+      SamzaExecutionContext.this.setMetricsContainer(new SamzaMetricsContainer(metricsRegistry));
+      return SamzaExecutionContext.this;
+    }
+  }
 }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java
index 9badbc3..db71dc9 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java
@@ -53,7 +53,7 @@ public class SamzaPipelineResult implements PipelineResult {
 
   @Override
   public State cancel() {
-    runner.kill(app);
+    runner.kill();
     return waitUntilFinish();
   }
 
@@ -88,7 +88,7 @@ public class SamzaPipelineResult implements PipelineResult {
   }
 
   private StateInfo getStateInfo() {
-    final ApplicationStatus status = runner.status(app);
+    final ApplicationStatus status = runner.status();
     switch (status.getStatusCode()) {
       case New:
         return new StateInfo(State.STOPPED);
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
index bd4148b..6876162 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
@@ -17,24 +17,8 @@
  */
 package org.apache.beam.runners.samza;
 
-import java.time.Duration;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
-import org.apache.beam.runners.fnexecution.GrpcFnServer;
-import org.apache.beam.runners.fnexecution.ServerFactory;
-import org.apache.beam.runners.fnexecution.control.ControlClientPool;
-import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
-import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
-import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
-import org.apache.beam.runners.fnexecution.control.MapControlClientPool;
-import org.apache.beam.runners.fnexecution.control.SingleEnvironmentInstanceJobBundleFactory;
-import org.apache.beam.runners.fnexecution.data.GrpcDataService;
-import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
-import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
-import org.apache.beam.runners.fnexecution.state.GrpcStateService;
-import org.apache.beam.runners.samza.metrics.SamzaMetricsContainer;
 import org.apache.beam.runners.samza.translation.ConfigBuilder;
 import org.apache.beam.runners.samza.translation.PViewToIdMapper;
 import org.apache.beam.runners.samza.translation.PortableTranslationContext;
@@ -45,19 +29,12 @@ import org.apache.beam.runners.samza.translation.TranslationContext;
 import org.apache.beam.runners.samza.util.PipelineDotRenderer;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineRunner;
-import org.apache.beam.sdk.fn.IdGenerators;
-import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
-import org.apache.samza.metrics.MetricsRegistryMap;
-import org.apache.samza.operators.ContextManager;
-import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.task.TaskContext;
+import org.apache.samza.runtime.ApplicationRunners;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,15 +44,6 @@ import org.slf4j.LoggerFactory;
  */
 public class SamzaRunner extends PipelineRunner<SamzaPipelineResult> {
   private static final Logger LOG = LoggerFactory.getLogger(SamzaRunner.class);
-  // temporarily hardcode the worker id before we start supporting multiple workers
-  private static final String SAMZA_WORKER_ID = "samza_py_worker_id";
-
-  private GrpcFnServer<FnApiControlClientPoolService> fnControlServer;
-  private GrpcFnServer<GrpcDataService> fnDataServer;
-  private GrpcFnServer<GrpcStateService> fnStateServer;
-  private ControlClientPool controlClientPool;
-  private JobBundleFactory jobBundleFactory;
-  private ExecutorService dataExecutor;
 
   public static SamzaRunner fromOptions(PipelineOptions opts) {
     final SamzaPipelineOptions samzaOptions = SamzaPipelineOptionsValidator.validate(opts);
@@ -88,118 +56,20 @@ public class SamzaRunner extends PipelineRunner<SamzaPipelineResult> {
     this.options = options;
   }
 
-  private static void closeAutoClosable(AutoCloseable closeable) {
-    try (AutoCloseable closer = closeable) {
-      // do nothing
-    } catch (Exception e) {
-      LOG.error(
-          "Failed to close {}. Ignore since this is shutdown process...",
-          closeable.getClass().getSimpleName(),
-          e);
-    }
-  }
-
-  private void setUpContextManager(
-      StreamGraph streamGraph, SamzaExecutionContext executionContext) {
-    streamGraph.withContextManager(
-        new ContextManager() {
-          @Override
-          public void init(Config config, TaskContext context) {
-            if (executionContext.getMetricsContainer() == null) {
-              final MetricsRegistryMap metricsRegistry =
-                  (MetricsRegistryMap) context.getSamzaContainerContext().metricsRegistry;
-              executionContext.setMetricsContainer(new SamzaMetricsContainer(metricsRegistry));
-            }
-
-            if (SamzaRunnerOverrideConfigs.isPortableMode(options)) {
-              if (jobBundleFactory == null) {
-                try {
-                  final long waitTimeoutMs =
-                      SamzaRunnerOverrideConfigs.getControlClientWaitTimeoutMs(options);
-                  final InstructionRequestHandler instructionHandler =
-                      controlClientPool
-                          .getSource()
-                          .take(SAMZA_WORKER_ID, Duration.ofMillis(waitTimeoutMs));
-                  final EnvironmentFactory environmentFactory =
-                      environment -> RemoteEnvironment.forHandler(environment, instructionHandler);
-                  // TODO: use JobBundleFactoryBase.WrappedSdkHarnessClient.wrapping
-                  jobBundleFactory =
-                      SingleEnvironmentInstanceJobBundleFactory.create(
-                          environmentFactory,
-                          fnDataServer,
-                          fnStateServer,
-                          IdGenerators.incrementingLongs());
-                } catch (Exception e) {
-                  throw new RuntimeException(
-                      "Running samza in Beam portable mode but failed to create job bundle factory",
-                      e);
-                }
-                executionContext.setJobBundleFactory(jobBundleFactory);
-              }
-            }
-
-            context.setUserContext(executionContext);
-          }
-
-          @Override
-          public void close() {
-            closeAutoClosable(fnControlServer);
-            fnControlServer = null;
-            closeAutoClosable(fnDataServer);
-            fnDataServer = null;
-            closeAutoClosable(fnStateServer);
-            fnStateServer = null;
-            if (dataExecutor != null) {
-              dataExecutor.shutdown();
-              dataExecutor = null;
-            }
-            controlClientPool = null;
-            closeAutoClosable(jobBundleFactory);
-            jobBundleFactory = null;
-          }
-        });
-  }
-
-  private void setUpFnApiServer() {
-    controlClientPool = MapControlClientPool.create();
-    dataExecutor = Executors.newCachedThreadPool();
-    try {
-      fnControlServer =
-          GrpcFnServer.allocatePortAndCreateFor(
-              FnApiControlClientPoolService.offeringClientsToPool(
-                  controlClientPool.getSink(), () -> SAMZA_WORKER_ID),
-              ServerFactory.createWithPortSupplier(
-                  () -> SamzaRunnerOverrideConfigs.getFnControlPort(options)));
-
-      fnDataServer =
-          GrpcFnServer.allocatePortAndCreateFor(
-              GrpcDataService.create(dataExecutor, OutboundObserverFactory.serverDirect()),
-              ServerFactory.createDefault());
-
-      fnStateServer =
-          GrpcFnServer.allocatePortAndCreateFor(
-              GrpcStateService.create(), ServerFactory.createDefault());
-    } catch (Exception e) {
-      LOG.error("Failed to set up fn api servers", e);
-      throw new RuntimeException(e);
-    }
-  }
-
   SamzaPipelineResult runPortablePipeline(RunnerApi.Pipeline pipeline) {
-    final SamzaExecutionContext executionContext = new SamzaExecutionContext();
-    // TODO: this will be moved to setUpContextManager in samza 1.0 migration
-    setUpFnApiServer();
     ConfigBuilder configBuilder = new ConfigBuilder(options);
     SamzaPortablePipelineTranslator.createConfig(pipeline, configBuilder);
-    final ApplicationRunner runner = ApplicationRunner.fromConfig(configBuilder.build());
+    final SamzaExecutionContext executionContext = new SamzaExecutionContext(options);
     final StreamApplication app =
-        (streamGraph, config) -> {
-          setUpContextManager(streamGraph, executionContext);
+        appDescriptor -> {
+          appDescriptor.withApplicationContainerContextFactory(executionContext.new Factory());
           SamzaPortablePipelineTranslator.translate(
-              pipeline, new PortableTranslationContext(streamGraph, options));
+              pipeline, new PortableTranslationContext(appDescriptor, options));
         };
+    final ApplicationRunner runner =
+        ApplicationRunners.getApplicationRunner(app, configBuilder.build());
     final SamzaPipelineResult result = new SamzaPipelineResult(app, runner, executionContext);
-    runner.run(app);
+    runner.run();
     return result;
   }
 
@@ -217,28 +87,23 @@ public class SamzaRunner extends PipelineRunner<SamzaPipelineResult> {
       LOG.debug("Post-processed Beam pipeline:\n{}", PipelineDotRenderer.toDotString(pipeline));
     }
 
-    // Add a dummy source for use in special cases (TestStream, empty flatten)
-    final PValue dummySource = pipeline.apply("Dummy Input Source", Create.of("dummy"));
     final Map<PValue, String> idMap = PViewToIdMapper.buildIdMap(pipeline);
 
     final ConfigBuilder configBuilder = new ConfigBuilder(options);
     SamzaPipelineTranslator.createConfig(pipeline, options, idMap, configBuilder);
-    final ApplicationRunner runner = ApplicationRunner.fromConfig(configBuilder.build());
-
-    final SamzaExecutionContext executionContext = new SamzaExecutionContext();
+    final SamzaExecutionContext executionContext = new SamzaExecutionContext(options);
 
     final StreamApplication app =
-        new StreamApplication() {
-          @Override
-          public void init(StreamGraph streamGraph, Config config) {
-            setUpContextManager(streamGraph, executionContext);
-            SamzaPipelineTranslator.translate(
-                pipeline, new TranslationContext(streamGraph, idMap, options, dummySource));
-          }
+        appDescriptor -> {
+          appDescriptor.withApplicationContainerContextFactory(executionContext.new Factory());
+          SamzaPipelineTranslator.translate(
+              pipeline, new TranslationContext(appDescriptor, idMap, options));
         };
 
+    final ApplicationRunner runner =
+        ApplicationRunners.getApplicationRunner(app, configBuilder.build());
     final SamzaPipelineResult result = new SamzaPipelineResult(app, runner, executionContext);
-    runner.run(app);
+    runner.run();
     return result;
   }
 }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/container/BeamContainerRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/container/BeamContainerRunner.java
index c1e0c99..be7aa4f 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/container/BeamContainerRunner.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/container/BeamContainerRunner.java
@@ -18,34 +18,67 @@
 package org.apache.beam.runners.samza.container;
 
 import java.time.Duration;
-import org.apache.samza.application.StreamApplication;
+import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.application.descriptors.ApplicationDescriptor;
+import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
+import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ShellCommandConfig;
+import org.apache.samza.context.ExternalContext;
 import org.apache.samza.job.ApplicationStatus;
-import org.apache.samza.runtime.LocalContainerRunner;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ContainerLaunchUtil;
+import org.apache.samza.util.SamzaUncaughtExceptionHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** Runs the beam Yarn container, using the static global job model. */
-public class BeamContainerRunner extends LocalContainerRunner {
+public class BeamContainerRunner implements ApplicationRunner {
   private static final Logger LOG = LoggerFactory.getLogger(ContainerCfgFactory.class);
 
-  public BeamContainerRunner(Config config) {
-    super(ContainerCfgFactory.jobModel, System.getenv(ShellCommandConfig.ENV_CONTAINER_ID()));
+  private final ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc;
+
+  public BeamContainerRunner(SamzaApplication app, Config config) {
+    this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config);
+  }
+
+  @Override
+  public void run(ExternalContext externalContext) {
+    Thread.setDefaultUncaughtExceptionHandler(
+        new SamzaUncaughtExceptionHandler(
+            () -> {
+              LOG.info("Exiting process now.");
+              System.exit(1);
+            }));
+
+    ContainerLaunchUtil.run(
+        appDesc,
+        System.getenv(ShellCommandConfig.ENV_CONTAINER_ID()),
+        ContainerCfgFactory.jobModel);
+  }
+
+  @Override
+  public void kill() {
+    // Do nothing. Yarn will kill the container.
   }
 
   @Override
-  public ApplicationStatus status(StreamApplication app) {
+  public ApplicationStatus status() {
+    // The container is running during the life span of this object.
     return ApplicationStatus.Running;
   }
 
   @Override
   public void waitForFinish() {
+    // Container run is synchronous
+    // so calling waitForFinish() after run() should return immediately
     LOG.info("Container has stopped");
   }
 
   @Override
   public boolean waitForFinish(Duration timeout) {
+    // Container run is synchronous
+    // so calling waitForFinish() after run() should return immediately
     LOG.info("Container has stopped");
     return true;
   }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
index 663f0d0..eb1d997 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
@@ -54,8 +54,8 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators;
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.TimerRegistry;
-import org.apache.samza.task.TaskContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.Scheduler;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -138,8 +138,8 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
   @Override
   public void open(
       Config config,
-      TaskContext context,
-      TimerRegistry<KeyedTimerData<Void>> timerRegistry,
+      Context context,
+      Scheduler<KeyedTimerData<Void>> timerRegistry,
       OpEmitter<OutT> emitter) {
     this.inputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
     this.sideInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
@@ -154,12 +154,12 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
 
     final SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory =
         SamzaStoreStateInternals.createStateInternalFactory(
-            null, context, pipelineOptions, signature, mainOutputTag);
+            null, context.getTaskContext(), pipelineOptions, signature, mainOutputTag);
 
     this.timerInternalsFactory =
         SamzaTimerInternalsFactory.createTimerInternalFactory(
             keyCoder,
-            (TimerRegistry) timerRegistry,
+            (Scheduler) timerRegistry,
             getTimerStateId(signature),
             nonKeyedStateInternalsFactory,
             windowingStrategy,
@@ -170,7 +170,7 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
 
     if (isPortable) {
       SamzaExecutionContext samzaExecutionContext =
-          (SamzaExecutionContext) context.getUserContext();
+          (SamzaExecutionContext) context.getApplicationContainerContext();
       ExecutableStage executableStage = ExecutableStage.fromPayload(stagePayload);
       stageBundleFactory = samzaExecutionContext.getJobBundleFactory().forStage(executableStage);
       this.fnRunner =
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java
index 76f7a32..f593ecc 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/GroupByKeyOp.java
@@ -46,8 +46,8 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.TimerRegistry;
-import org.apache.samza.task.TaskContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.Scheduler;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -97,8 +97,8 @@ public class GroupByKeyOp<K, InputT, OutputT>
   @Override
   public void open(
       Config config,
-      TaskContext context,
-      TimerRegistry<KeyedTimerData<K>> timerRegistry,
+      Context context,
+      Scheduler<KeyedTimerData<K>> timerRegistry,
       OpEmitter<KV<K, OutputT>> emitter) {
     this.pipelineOptions =
         Base64Serializer.deserializeUnchecked(
@@ -108,7 +108,7 @@ public class GroupByKeyOp<K, InputT, OutputT>
 
     final SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory =
         SamzaStoreStateInternals.createStateInternalFactory(
-            null, context, pipelineOptions, null, mainOutputTag);
+            null, context.getTaskContext(), pipelineOptions, null, mainOutputTag);
 
     final DoFnRunners.OutputManager outputManager = outputManagerFactory.create(emitter);
 
@@ -117,7 +117,7 @@ public class GroupByKeyOp<K, InputT, OutputT>
             mainOutputTag.getId(),
             Collections.singletonMap(
                 SamzaStoreStateInternals.BEAM_STORE,
-                SamzaStoreStateInternals.getBeamStore(context)),
+                SamzaStoreStateInternals.getBeamStore(context.getTaskContext())),
             keyCoder,
             pipelineOptions.getStoreBatchGetSize());
 
@@ -170,7 +170,8 @@ public class GroupByKeyOp<K, InputT, OutputT>
             windowingStrategy,
             DoFnSchemaInformation.create());
 
-    final SamzaExecutionContext executionContext = (SamzaExecutionContext) context.getUserContext();
+    final SamzaExecutionContext executionContext =
+        (SamzaExecutionContext) context.getApplicationContainerContext();
     this.fnRunner =
         DoFnRunnerWithMetrics.wrap(doFnRunner, executionContext.getMetricsContainer(), stepName);
   }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/Op.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/Op.java
index 793ce6a..cbf5c46 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/Op.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/Op.java
@@ -20,8 +20,8 @@ package org.apache.beam.runners.samza.runtime;
 import java.io.Serializable;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.TimerRegistry;
-import org.apache.samza.task.TaskContext;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.Scheduler;
 import org.joda.time.Instant;
 
 /**
@@ -41,8 +41,8 @@ public interface Op<InT, OutT, K> extends Serializable {
    */
   default void open(
       Config config,
-      TaskContext taskContext,
-      TimerRegistry<KeyedTimerData<K>> timerRegistry,
+      Context context,
+      Scheduler<KeyedTimerData<K>> timerRegistry,
       OpEmitter<OutT> emitter) {}
 
   void processElement(WindowedValue<InT> inputElement, OpEmitter<OutT> emitter);
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
index 49601ff..8b958db 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpAdapter.java
@@ -24,11 +24,11 @@ import java.util.List;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.TimerRegistry;
+import org.apache.samza.context.Context;
+import org.apache.samza.operators.Scheduler;
 import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
-import org.apache.samza.task.TaskContext;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory;
 public class OpAdapter<InT, OutT, K>
     implements FlatMapFunction<OpMessage<InT>, OpMessage<OutT>>,
         WatermarkFunction<OpMessage<OutT>>,
-        TimerFunction<KeyedTimerData<K>, OpMessage<OutT>>,
+        ScheduledFunction<KeyedTimerData<K>, OpMessage<OutT>>,
         Serializable {
   private static final Logger LOG = LoggerFactory.getLogger(OpAdapter.class);
 
@@ -46,7 +46,7 @@ public class OpAdapter<InT, OutT, K>
   private transient Instant outputWatermark;
   private transient OpEmitter<OutT> emitter;
   private transient Config config;
-  private transient TaskContext taskContext;
+  private transient Context context;
 
   public static <InT, OutT, K> FlatMapFunction<OpMessage<InT>, OpMessage<OutT>> adapt(
       Op<InT, OutT, K> op) {
@@ -58,18 +58,18 @@ public class OpAdapter<InT, OutT, K>
   }
 
   @Override
-  public final void init(Config config, TaskContext context) {
+  public final void init(Context context) {
     this.outputList = new ArrayList<>();
     this.emitter = new OpEmitterImpl();
-    this.config = config;
-    this.taskContext = context;
+    this.config = context.getJobContext().getConfig();
+    this.context = context;
   }
 
   @Override
-  public final void registerTimer(TimerRegistry<KeyedTimerData<K>> timerRegistry) {
-    assert taskContext != null;
+  public final void schedule(Scheduler<KeyedTimerData<K>> timerRegistry) {
+    assert context != null;
 
-    op.open(config, taskContext, timerRegistry, emitter);
+    op.open(config, context, timerRegistry, emitter);
   }
 
   @Override
@@ -124,7 +124,7 @@ public class OpAdapter<InT, OutT, K>
   }
 
   @Override
-  public Collection<OpMessage<OutT>> onTimer(KeyedTimerData<K> keyedTimerData, long time) {
+  public Collection<OpMessage<OutT>> onCallback(KeyedTimerData<K> keyedTimerData, long time) {
     assert outputList.isEmpty();
 
     try {
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpMessage.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpMessage.java
index ab28ac5..34254b9 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpMessage.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/OpMessage.java
@@ -21,8 +21,9 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.joda.time.Instant;
 
 /**
- * Actual message type used in Samza {@link org.apache.samza.operators.StreamGraph}. It contains
- * either an element of main inputs or the collection results from a view (used as side input).
+ * Actual message type used in Samza {@link org.apache.samza.application.StreamApplication}. It
+ * contains either an element of main inputs or the collection results from a view (used as side
+ * input).
  */
 public class OpMessage<T> {
   /**
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnInvokerRegistrar.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnInvokerRegistrar.java
index 5ac9c11..ebb01b0 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnInvokerRegistrar.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnInvokerRegistrar.java
@@ -20,14 +20,14 @@ package org.apache.beam.runners.samza.runtime;
 import java.util.Map;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
-import org.apache.samza.task.TaskContext;
+import org.apache.samza.context.Context;
 
 /** A registrar for Samza DoFnInvoker. */
 public interface SamzaDoFnInvokerRegistrar {
 
   /** Returns the invoker for a {@link DoFn}. */
   <InputT, OutputT> DoFnInvoker<InputT, OutputT> invokerFor(
-      DoFn<InputT, OutputT> fn, TaskContext context);
+      DoFn<InputT, OutputT> fn, Context context);
 
   /** Returns the configs for a {@link DoFn}. */
   <InputT, OutputT> Map<String, String> configFor(DoFn<InputT, OutputT> fn);
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
index 0dd93ee..692b664 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
@@ -49,7 +49,7 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
-import org.apache.samza.task.TaskContext;
+import org.apache.samza.context.Context;
 import org.joda.time.Instant;
 
 /** A factory for Samza runner translator to create underlying DoFnRunner used in {@link DoFnOp}. */
@@ -61,7 +61,7 @@ public class SamzaDoFnRunners {
       DoFn<InT, FnOutT> doFn,
       WindowingStrategy<?, ?> windowingStrategy,
       String stepName,
-      TaskContext taskContext,
+      Context context,
       TupleTag<FnOutT> mainOutputTag,
       SideInputHandler sideInputHandler,
       SamzaTimerInternalsFactory<?> timerInternalsFactory,
@@ -77,10 +77,10 @@ public class SamzaDoFnRunners {
     final DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
     final SamzaStoreStateInternals.Factory<?> stateInternalsFactory =
         SamzaStoreStateInternals.createStateInternalFactory(
-            keyCoder, taskContext, pipelineOptions, signature, mainOutputTag);
+            keyCoder, context.getTaskContext(), pipelineOptions, signature, mainOutputTag);
 
     final SamzaExecutionContext executionContext =
-        (SamzaExecutionContext) taskContext.getUserContext();
+        (SamzaExecutionContext) context.getApplicationContainerContext();
     if (signature.usesState()) {
       keyedInternals = new KeyedInternals(stateInternalsFactory, timerInternalsFactory);
       stateInternals = keyedInternals.stateInternals();
@@ -163,10 +163,10 @@ public class SamzaDoFnRunners {
       StageBundleFactory stageBundleFactory,
       TupleTag<FnOutT> mainOutputTag,
       Map<String, TupleTag<?>> idToTupleTagMap,
-      TaskContext taskContext,
+      Context context,
       String stepName) {
     final SamzaExecutionContext executionContext =
-        (SamzaExecutionContext) taskContext.getUserContext();
+        (SamzaExecutionContext) context.getApplicationContainerContext();
     final DoFnRunner<InT, FnOutT> sdkHarnessDoFnRunner =
         new SdkHarnessDoFnRunner<>(
             outputManager, stageBundleFactory, mainOutputTag, idToTupleTagMap);
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
index 9597ae9..620aae2 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternals.java
@@ -64,10 +64,10 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.primitives.Ints;
+import org.apache.samza.context.TaskContext;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueIterator;
 import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.task.TaskContext;
 import org.joda.time.Instant;
 
 /** {@link StateInternals} that uses Samza local {@link KeyValueStore} to manage state. */
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
index 5e16acd..d014a1e 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
@@ -37,7 +37,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.samza.operators.TimerRegistry;
+import org.apache.samza.operators.Scheduler;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,7 +52,7 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
 
   private final NavigableSet<KeyedTimerData<K>> eventTimeTimers;
   private final Coder<K> keyCoder;
-  private final TimerRegistry<KeyedTimerData<K>> timerRegistry;
+  private final Scheduler<KeyedTimerData<K>> timerRegistry;
   private final int timerBufferSize;
   private final SamzaTimerState state;
 
@@ -61,7 +61,7 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
 
   private SamzaTimerInternalsFactory(
       Coder<K> keyCoder,
-      TimerRegistry<KeyedTimerData<K>> timerRegistry,
+      Scheduler<KeyedTimerData<K>> timerRegistry,
       int timerBufferSize,
       String timerStateId,
       SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory,
@@ -75,7 +75,7 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
 
   static <K> SamzaTimerInternalsFactory<K> createTimerInternalFactory(
       Coder<K> keyCoder,
-      TimerRegistry<KeyedTimerData<K>> timerRegistry,
+      Scheduler<KeyedTimerData<K>> timerRegistry,
       String timerStateId,
       SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory,
       WindowingStrategy<?, BoundedWindow> windowingStrategy,
@@ -197,7 +197,7 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
           break;
 
         case PROCESSING_TIME:
-          timerRegistry.register(keyedTimerData, timerData.getTimestamp().getMillis());
+          timerRegistry.schedule(keyedTimerData, timerData.getTimestamp().getMillis());
           break;
 
         default:
@@ -347,7 +347,7 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
         // since the iterator will reach to the end, it will be closed automatically
         while (iter.hasNext()) {
           final KeyedTimerData<K> keyedTimerData = iter.next();
-          timerRegistry.register(
+          timerRegistry.schedule(
               keyedTimerData, keyedTimerData.getTimerData().getTimestamp().getMillis());
         }
       }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
index a3578ed..f4c0b8f 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ConfigBuilder.java
@@ -42,7 +42,6 @@ import org.apache.samza.config.factories.PropertiesConfigFactory;
 import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
 import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.serializers.ByteSerdeFactory;
-import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
 import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
 
 /** Builder class to generate configs for BEAM samza runner during runtime. */
@@ -78,6 +77,9 @@ public class ConfigBuilder {
           "beamPipelineOptions",
           Base64Serializer.serializeUnchecked(new SerializablePipelineOptions(options)));
 
+      // TODO: remove after we sort out Samza task wrapper
+      config.put("samza.li.task.wrapper.enabled", "false");
+
       return new MapConfig(config);
     } catch (Exception e) {
       throw new RuntimeException(e);
@@ -137,9 +139,6 @@ public class ConfigBuilder {
         .put(
             JobCoordinatorConfig.JOB_COORDINATOR_FACTORY,
             PassthroughJobCoordinatorFactory.class.getName())
-        .put(
-            JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY,
-            PassthroughCoordinationUtilsFactory.class.getName())
         .put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName())
         .put(TaskConfig.COMMIT_MS(), "-1")
         .put("processor.id", "1")
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java
index 61042b0..e72e601 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java
@@ -44,6 +44,11 @@ class FlattenPCollectionsTranslator<T> implements TransformTranslator<Flatten.PC
   @Override
   public void translate(
       Flatten.PCollections<T> transform, TransformHierarchy.Node node, TranslationContext ctx) {
+    doTranslate(transform, node, ctx);
+  }
+
+  private static <T> void doTranslate(
+      Flatten.PCollections<T> transform, TransformHierarchy.Node node, TranslationContext ctx) {
     final PCollection<T> output = ctx.getOutput(transform);
 
     final List<MessageStream<OpMessage<T>>> inputStreams = new ArrayList<>();
@@ -79,6 +84,13 @@ class FlattenPCollectionsTranslator<T> implements TransformTranslator<Flatten.PC
       PipelineNode.PTransformNode transform,
       QueryablePipeline pipeline,
       PortableTranslationContext ctx) {
+    doTranslatePortable(transform, pipeline, ctx);
+  }
+
+  private static <T> void doTranslatePortable(
+      PipelineNode.PTransformNode transform,
+      QueryablePipeline pipeline,
+      PortableTranslationContext ctx) {
     final List<MessageStream<OpMessage<T>>> inputStreams = ctx.getAllInputMessageStreams(transform);
     final String outputId = ctx.getOutputId(transform);
 
@@ -91,7 +103,7 @@ class FlattenPCollectionsTranslator<T> implements TransformTranslator<Flatten.PC
   }
 
   // Merge multiple input streams into one, as this is what "flatten" is meant to do
-  private MessageStream<OpMessage<T>> mergeInputStreams(
+  private static <T> MessageStream<OpMessage<T>> mergeInputStreams(
       List<MessageStream<OpMessage<T>>> inputStreams) {
     if (inputStreams.size() == 1) {
       return Iterables.getOnlyElement(inputStreams);
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
index 4599f53..13c6f11 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/GroupByKeyTranslator.java
@@ -59,6 +59,13 @@ class GroupByKeyTranslator<K, InputT, OutputT>
       PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> transform,
       TransformHierarchy.Node node,
       TranslationContext ctx) {
+    doTranslate(transform, node, ctx);
+  }
+
+  private static <K, InputT, OutputT> void doTranslate(
+      PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> transform,
+      TransformHierarchy.Node node,
+      TranslationContext ctx) {
     final PCollection<KV<K, InputT>> input = ctx.getInput(transform);
 
     final PCollection<KV<K, OutputT>> output = ctx.getOutput(transform);
@@ -96,6 +103,13 @@ class GroupByKeyTranslator<K, InputT, OutputT>
       PipelineNode.PTransformNode transform,
       QueryablePipeline pipeline,
       PortableTranslationContext ctx) {
+    doTranslatePortable(transform, pipeline, ctx);
+  }
+
+  private static <K, InputT, OutputT> void doTranslatePortable(
+      PipelineNode.PTransformNode transform,
+      QueryablePipeline pipeline,
+      PortableTranslationContext ctx) {
     final MessageStream<OpMessage<KV<K, InputT>>> inputStream =
         ctx.getOneInputMessageStream(transform);
     final boolean needRepartition = ctx.getSamzaPipelineOptions().getMaxSourceParallelism() > 1;
@@ -134,7 +148,7 @@ class GroupByKeyTranslator<K, InputT, OutputT>
     ctx.registerMessageStream(ctx.getOutputId(transform), outputStream);
   }
 
-  private MessageStream<OpMessage<KV<K, OutputT>>> doTranslateGBK(
+  private static <K, InputT, OutputT> MessageStream<OpMessage<KV<K, OutputT>>> doTranslateGBK(
       MessageStream<OpMessage<KV<K, InputT>>> inputStream,
       boolean needRepartition,
       SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> reduceFn,
@@ -185,10 +199,11 @@ class GroupByKeyTranslator<K, InputT, OutputT>
   }
 
   @SuppressWarnings("unchecked")
-  private SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> getSystemReduceFn(
-      PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> transform,
-      Pipeline pipeline,
-      KvCoder<K, InputT> kvInputCoder) {
+  private static <K, InputT, OutputT>
+      SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> getSystemReduceFn(
+          PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> transform,
+          Pipeline pipeline,
+          KvCoder<K, InputT> kvInputCoder) {
     if (transform instanceof GroupByKey) {
       return (SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow>)
           SystemReduceFn.buffering(kvInputCoder.getValueCoder());
@@ -203,7 +218,7 @@ class GroupByKeyTranslator<K, InputT, OutputT>
     }
   }
 
-  private boolean needRepartition(TransformHierarchy.Node node, TranslationContext ctx) {
+  private static boolean needRepartition(TransformHierarchy.Node node, TranslationContext ctx) {
     if (ctx.getPipelineOptions().getMaxSourceParallelism() == 1) {
       // Only one task will be created, no need for repartition
       return false;
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ImpulseTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ImpulseTranslator.java
index 197350b..baa51ab 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ImpulseTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ImpulseTranslator.java
@@ -17,38 +17,32 @@
  */
 package org.apache.beam.runners.samza.translation;
 
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.beam.runners.core.construction.graph.PipelineNode;
 import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
+import org.apache.beam.runners.samza.runtime.OpMessage;
+import org.apache.samza.operators.KV;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.system.descriptors.GenericSystemDescriptor;
 
 /**
  * Translate {@link org.apache.beam.sdk.transforms.Impulse} to a samza message stream produced by
  * {@link
  * org.apache.beam.runners.samza.translation.SamzaImpulseSystemFactory.SamzaImpulseSystemConsumer}.
  */
-public class ImpulseTranslator implements TransformTranslator, TransformConfigGenerator {
+public class ImpulseTranslator implements TransformTranslator {
   @Override
   public void translatePortable(
       PipelineNode.PTransformNode transform,
       QueryablePipeline pipeline,
       PortableTranslationContext ctx) {
-    final String outputId = ctx.getOutputId(transform);
-    ctx.registerInputMessageStream(outputId);
-  }
-
-  @Override
-  public Map<String, String> createPortableConfig(PipelineNode.PTransformNode transform) {
-    final String id = Iterables.getOnlyElement(transform.getTransform().getOutputsMap().values());
-
-    final Map<String, String> config = new HashMap<>();
-    final String systemPrefix = "systems." + id;
-    final String streamPrefix = "streams." + id;
 
-    config.put(systemPrefix + ".samza.factory", SamzaImpulseSystemFactory.class.getName());
-    config.put(streamPrefix + ".samza.system", id);
+    final String outputId = ctx.getOutputId(transform);
+    final GenericSystemDescriptor systemDescriptor =
+        new GenericSystemDescriptor(outputId, SamzaImpulseSystemFactory.class.getName());
+    final GenericInputDescriptor<KV<?, OpMessage<byte[]>>> inputDescriptor =
+        systemDescriptor.getInputDescriptor(outputId, new NoOpSerde<>());
 
-    return config;
+    ctx.registerInputMessageStream(outputId, inputDescriptor);
   }
 }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
index 25eddb7..e4651c4 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ParDoBoundMultiTranslator.java
@@ -81,6 +81,14 @@ class ParDoBoundMultiTranslator<InT, OutT>
       ParDo.MultiOutput<InT, OutT> transform,
       TransformHierarchy.Node node,
       TranslationContext ctx) {
+    doTranslate(transform, node, ctx);
+  }
+
+  // static for serializing anonymous functions
+  private static <InT, OutT> void doTranslate(
+      ParDo.MultiOutput<InT, OutT> transform,
+      TransformHierarchy.Node node,
+      TranslationContext ctx) {
     final PCollection<? extends InT> input = ctx.getInput(transform);
     final Map<TupleTag<?>, Coder<?>> outputCoders =
         ctx.getCurrentTransform().getOutputs().entrySet().stream()
@@ -179,6 +187,14 @@ class ParDoBoundMultiTranslator<InT, OutT>
       PipelineNode.PTransformNode transform,
       QueryablePipeline pipeline,
       PortableTranslationContext ctx) {
+    doTranslatePortable(transform, pipeline, ctx);
+  }
+
+  // static for serializing anonymous functions
+  private static <InT, OutT> void doTranslatePortable(
+      PipelineNode.PTransformNode transform,
+      QueryablePipeline pipeline,
+      PortableTranslationContext ctx) {
     Map<String, String> outputs = transform.getTransform().getOutputsMap();
 
     final RunnerApi.ExecutableStagePayload stagePayload;
@@ -296,7 +312,7 @@ class ParDoBoundMultiTranslator<InT, OutT>
     return config;
   }
 
-  private class SideInputWatermarkFn
+  private static class SideInputWatermarkFn<InT>
       implements FlatMapFunction<OpMessage<InT>, OpMessage<InT>>,
           WatermarkFunction<OpMessage<InT>> {
 
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java
index 6aa608b..93a28b7 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.samza.translation;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -24,13 +25,24 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
 import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
 import org.apache.beam.runners.samza.SamzaPipelineOptions;
 import org.apache.beam.runners.samza.runtime.OpMessage;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.system.descriptors.InputDescriptor;
+import org.apache.samza.system.descriptors.OutputDescriptor;
 
 /**
  * Helper that keeps the mapping from BEAM PCollection id to Samza {@link MessageStream}. It also
@@ -39,13 +51,14 @@ import org.apache.samza.operators.StreamGraph;
  */
 public class PortableTranslationContext {
   private final Map<String, MessageStream<?>> messsageStreams = new HashMap<>();
-  private final StreamGraph streamGraph;
+  private final StreamApplicationDescriptor appDescriptor;
   private final SamzaPipelineOptions options;
   private int topologicalId;
   private final Set<String> registeredInputStreams = new HashSet<>();
 
-  public PortableTranslationContext(StreamGraph streamGraph, SamzaPipelineOptions options) {
-    this.streamGraph = streamGraph;
+  public PortableTranslationContext(
+      StreamApplicationDescriptor appDescriptor, SamzaPipelineOptions options) {
+    this.appDescriptor = appDescriptor;
     this.options = options;
   }
 
@@ -93,33 +106,64 @@ public class PortableTranslationContext {
     messsageStreams.put(id, stream);
   }
 
-  /** Register an input stream, using the PCollection id as the config id. */
-  public void registerInputMessageStream(String id) {
-    registerInputMessageStreamWithStreamId(id, id);
+  /** Get output stream by output descriptor. */
+  public <OutT> OutputStream<OutT> getOutputStream(OutputDescriptor<OutT, ?> outputDescriptor) {
+    return appDescriptor.getOutputStream(outputDescriptor);
   }
 
-  /** Get output stream by stream id. */
-  public <T> OutputStream<T> getOutputStreamById(String outputStreamId) {
-    return streamGraph.getOutputStream(outputStreamId);
-  }
-
-  /**
-   * Register an input stream with certain config id.
-   *
-   * @param id id of the PCollection in the input/output of PTransform
-   * @param streamId samza stream id which user can use to customize the stream level config
-   */
-  public <T> void registerInputMessageStreamWithStreamId(String id, String streamId) {
+  /** Register an input stream with certain config id. */
+  public <T> void registerInputMessageStream(
+      String id, InputDescriptor<KV<?, OpMessage<T>>, ?> inputDescriptor) {
     // we want to register it with the Samza graph only once per i/o stream
+    final String streamId = inputDescriptor.getStreamId();
     if (registeredInputStreams.contains(streamId)) {
       return;
     }
     final MessageStream<OpMessage<T>> stream =
-        streamGraph
-            .<org.apache.samza.operators.KV<?, OpMessage<T>>>getInputStream(streamId)
-            .map(org.apache.samza.operators.KV::getValue);
+        appDescriptor.getInputStream(inputDescriptor).map(org.apache.samza.operators.KV::getValue);
 
     registerMessageStream(id, stream);
     registeredInputStreams.add(streamId);
   }
+
+  public WindowedValue.WindowedValueCoder instantiateCoder(
+      String collectionId, RunnerApi.Components components) {
+    PipelineNode.PCollectionNode collectionNode =
+        PipelineNode.pCollection(collectionId, components.getPcollectionsOrThrow(collectionId));
+    try {
+      return (WindowedValue.WindowedValueCoder)
+          WireCoders.instantiateRunnerWireCoder(collectionNode, components);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public WindowingStrategy<?, BoundedWindow> getPortableWindowStrategy(
+      PipelineNode.PTransformNode transform, QueryablePipeline pipeline) {
+    String inputId = Iterables.getOnlyElement(transform.getTransform().getInputsMap().values());
+    RehydratedComponents rehydratedComponents =
+        RehydratedComponents.forComponents(pipeline.getComponents());
+
+    RunnerApi.WindowingStrategy windowingStrategyProto =
+        pipeline
+            .getComponents()
+            .getWindowingStrategiesOrThrow(
+                pipeline.getComponents().getPcollectionsOrThrow(inputId).getWindowingStrategyId());
+
+    WindowingStrategy<?, ?> windowingStrategy;
+    try {
+      windowingStrategy =
+          WindowingStrategyTranslation.fromProto(windowingStrategyProto, rehydratedComponents);
+    } catch (Exception e) {
+      throw new IllegalStateException(
+          String.format(
+              "Unable to hydrate GroupByKey windowing strategy %s.", windowingStrategyProto),
+          e);
+    }
+
+    @SuppressWarnings("unchecked")
+    WindowingStrategy<?, BoundedWindow> ret =
+        (WindowingStrategy<?, BoundedWindow>) windowingStrategy;
+    return ret;
+  }
 }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReadTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReadTranslator.java
index cf8fa18..c5dcd9d 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReadTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReadTranslator.java
@@ -17,11 +17,11 @@
  */
 package org.apache.beam.runners.samza.translation;
 
-import java.util.HashMap;
 import java.util.Map;
 import org.apache.beam.runners.core.serialization.Base64Serializer;
 import org.apache.beam.runners.samza.adapter.BoundedSourceSystem;
 import org.apache.beam.runners.samza.adapter.UnboundedSourceSystem;
+import org.apache.beam.runners.samza.runtime.OpMessage;
 import org.apache.beam.runners.samza.util.SamzaCoders;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.BoundedSource;
@@ -32,14 +32,19 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
+import org.apache.samza.operators.KV;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.system.descriptors.GenericSystemDescriptor;
 
 /**
  * Translates {@link org.apache.beam.sdk.io.Read} to Samza input {@link
  * org.apache.samza.operators.MessageStream}.
  */
-public class ReadTranslator<T>
-    implements TransformTranslator<PTransform<PBegin, PCollection<T>>>,
-        TransformConfigGenerator<PTransform<PBegin, PCollection<T>>> {
+public class ReadTranslator<T> implements TransformTranslator<PTransform<PBegin, PCollection<T>>> {
 
   @Override
   public void translate(
@@ -47,39 +52,40 @@ public class ReadTranslator<T>
       TransformHierarchy.Node node,
       TranslationContext ctx) {
     final PCollection<T> output = ctx.getOutput(transform);
-    ctx.registerInputMessageStream(output);
-  }
-
-  @Override
-  public Map<String, String> createConfig(
-      PTransform<PBegin, PCollection<T>> transform,
-      TransformHierarchy.Node node,
-      ConfigContext ctx) {
-    final String id = ctx.getOutputId(node);
-    final PCollection<T> output = ctx.getOutput(transform);
     final Coder<WindowedValue<T>> coder = SamzaCoders.of(output);
     final Source<?> source =
         transform instanceof Read.Unbounded
             ? ((Read.Unbounded) transform).getSource()
             : ((Read.Bounded) transform).getSource();
+    final String id = ctx.getIdForPValue(output);
 
-    final Map<String, String> config = new HashMap<>();
-    final String systemPrefix = "systems." + id;
-    final String streamPrefix = "streams." + id;
-
-    config.put(systemPrefix + ".source", Base64Serializer.serializeUnchecked(source));
-    config.put(systemPrefix + ".coder", Base64Serializer.serializeUnchecked(coder));
-    config.put(systemPrefix + ".stepName", node.getFullName());
+    // Create system descriptor
+    final GenericSystemDescriptor systemDescriptor;
+    if (source instanceof BoundedSource) {
+      systemDescriptor =
+          new GenericSystemDescriptor(id, BoundedSourceSystem.Factory.class.getName());
+    } else {
+      systemDescriptor =
+          new GenericSystemDescriptor(id, UnboundedSourceSystem.Factory.class.getName());
+    }
 
-    config.put(streamPrefix + ".samza.system", id);
+    final Map<String, String> systemConfig =
+        ImmutableMap.of(
+            "source", Base64Serializer.serializeUnchecked(source),
+            "coder", Base64Serializer.serializeUnchecked(coder),
+            "stepName", node.getFullName());
+    systemDescriptor.withSystemConfigs(systemConfig);
 
+    // Create stream descriptor
+    @SuppressWarnings("unchecked")
+    final Serde<KV<?, OpMessage<T>>> kvSerde =
+        (Serde) KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>());
+    final GenericInputDescriptor<KV<?, OpMessage<T>>> inputDescriptor =
+        systemDescriptor.getInputDescriptor(id, kvSerde);
     if (source instanceof BoundedSource) {
-      config.put(streamPrefix + ".samza.bounded", "true");
-      config.put(systemPrefix + ".samza.factory", BoundedSourceSystem.Factory.class.getName());
-    } else {
-      config.put(systemPrefix + ".samza.factory", UnboundedSourceSystem.Factory.class.getName());
+      inputDescriptor.isBounded();
     }
 
-    return config;
+    ctx.registerInputMessageStream(output, inputDescriptor);
   }
 }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPublishViewTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPublishViewTranslator.java
index 74c157a..76612e8 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPublishViewTranslator.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPublishViewTranslator.java
@@ -29,11 +29,20 @@ import org.apache.samza.operators.MessageStream;
 /** Translates {@link SamzaPublishView} to a view {@link MessageStream} as side input. */
 class SamzaPublishViewTranslator<ElemT, ViewT>
     implements TransformTranslator<SamzaPublishView<ElemT, ViewT>> {
+
   @Override
   public void translate(
       SamzaPublishView<ElemT, ViewT> transform,
       TransformHierarchy.Node node,
       TranslationContext ctx) {
+    doTranslate(transform, node, ctx);
+  }
+
+  private static <ElemT, ViewT> void doTranslate(
+      SamzaPublishView<ElemT, ViewT> transform,
+      TransformHierarchy.Node node,
+      TranslationContext ctx) {
+
     final PCollection<List<ElemT>> input = ctx.getInput(transform);
     final MessageStream<OpMessage<Iterable<ElemT>>> inputStream = ctx.getMessageStream(input);
     @SuppressWarnings("unchecked")
@@ -51,9 +60,9 @@ class SamzaPublishViewTranslator<ElemT, ViewT>
             : elementStream.broadcast(
                 SamzaCoders.toSerde(elementCoder), "view-" + ctx.getCurrentTopologicalId());
 
+    final String viewId = ctx.getViewId(transform.getView());
     final MessageStream<OpMessage<Iterable<ElemT>>> outputStream =
-        broadcastStream.map(
-            element -> OpMessage.ofSideInput(ctx.getViewId(transform.getView()), element));
+        broadcastStream.map(element -> OpMessage.ofSideInput(viewId, element));
 
     ctx.registerViewStream(transform.getView(), outputStream);
   }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java
index 27d16c8..4120ef0 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/TranslationContext.java
@@ -19,22 +19,43 @@ package org.apache.beam.runners.samza.translation;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
+import java.util.function.Consumer;
 import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.runners.samza.SamzaPipelineOptions;
 import org.apache.beam.runners.samza.runtime.OpMessage;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.system.EndOfStreamMessage;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.WatermarkMessage;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.system.descriptors.GenericSystemDescriptor;
+import org.apache.samza.system.descriptors.InputDescriptor;
+import org.apache.samza.system.descriptors.OutputDescriptor;
+import org.apache.samza.system.inmemory.InMemorySystemFactory;
 import org.apache.samza.table.Table;
+import org.apache.samza.table.descriptors.TableDescriptor;
+import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,57 +66,61 @@ import org.slf4j.LoggerFactory;
  */
 public class TranslationContext {
   public static final Logger LOG = LoggerFactory.getLogger(TranslationContext.class);
-  private final StreamGraph streamGraph;
+  private final StreamApplicationDescriptor appDescriptor;
   private final Map<PValue, MessageStream<?>> messsageStreams = new HashMap<>();
   private final Map<PCollectionView<?>, MessageStream<?>> viewStreams = new HashMap<>();
   private final Map<PValue, String> idMap;
   private final Map<String, MessageStream> registeredInputStreams = new HashMap<>();
   private final Map<String, Table> registeredTables = new HashMap<>();
-  private final PValue dummySource;
   private final SamzaPipelineOptions options;
 
   private AppliedPTransform<?, ?, ?> currentTransform;
   private int topologicalId;
 
   public TranslationContext(
-      StreamGraph streamGraph,
+      StreamApplicationDescriptor appDescriptor,
       Map<PValue, String> idMap,
-      SamzaPipelineOptions options,
-      PValue dummySource) {
-    this.streamGraph = streamGraph;
+      SamzaPipelineOptions options) {
+    this.appDescriptor = appDescriptor;
     this.idMap = idMap;
     this.options = options;
-    this.dummySource = dummySource;
   }
 
-  public <OutT> void registerInputMessageStream(PValue pvalue) {
-    // We only register dummySource if it is actually used (determined by a call to getDummyStream).
-    if (!pvalue.equals(dummySource)) {
-      doRegisterInputMessageStream(pvalue, getIdForPValue(pvalue));
-    }
-  }
+  public <OutT> void registerInputMessageStream(
+      PValue pvalue,
+      InputDescriptor<org.apache.samza.operators.KV<?, OpMessage<OutT>>, ?> inputDescriptor) {
+    // we want to register it with the Samza graph only once per i/o stream
+    final String streamId = inputDescriptor.getStreamId();
+    if (registeredInputStreams.containsKey(streamId)) {
+      MessageStream<OpMessage<OutT>> messageStream = registeredInputStreams.get(streamId);
+      LOG.info(
+          String.format(
+              "Stream id %s has already been mapped to %s stream. Mapping %s to the same message stream.",
+              streamId, messageStream, pvalue));
+      registerMessageStream(pvalue, messageStream);
 
-  public <OutT> void registerInputMessageStreamById(PValue pvalue, String streamId) {
-    // We only register dummySource if it is actually used (determined by a call to getDummyStream).
-    if (!pvalue.equals(dummySource)) {
-      doRegisterInputMessageStream(pvalue, streamId);
+      return;
     }
+    @SuppressWarnings("unchecked")
+    final MessageStream<OpMessage<OutT>> typedStream =
+        getValueStream(appDescriptor.getInputStream(inputDescriptor));
+
+    registerMessageStream(pvalue, typedStream);
+    registeredInputStreams.put(streamId, typedStream);
   }
 
   public <OutT> void registerMessageStream(PValue pvalue, MessageStream<OpMessage<OutT>> stream) {
     if (messsageStreams.containsKey(pvalue)) {
       throw new IllegalArgumentException("Stream already registered for pvalue: " + pvalue);
     }
-
     messsageStreams.put(pvalue, stream);
   }
 
+  // Add a dummy stream for use in special cases (TestStream, empty flatten)
   public MessageStream<OpMessage<String>> getDummyStream() {
-    if (!messsageStreams.containsKey(dummySource)) {
-      doRegisterInputMessageStream(dummySource, getIdForPValue(dummySource));
-    }
-
-    return getMessageStream(dummySource);
+    InputDescriptor<OpMessage<String>, ?> dummyInput =
+        createDummyStreamDescriptor(UUID.randomUUID().toString());
+    return appDescriptor.getInputStream(dummyInput);
   }
 
   public <OutT> MessageStream<OpMessage<OutT>> getMessageStream(PValue pvalue) {
@@ -177,43 +202,53 @@ public class TranslationContext {
     return this.options;
   }
 
-  public <OutT> OutputStream<OutT> getOutputStreamById(String outputStreamId) {
-    return streamGraph.getOutputStream(outputStreamId);
+  public <OutT> OutputStream<OutT> getOutputStream(OutputDescriptor<OutT, ?> outputDescriptor) {
+    return appDescriptor.getOutputStream(outputDescriptor);
   }
 
   @SuppressWarnings("unchecked")
   public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDesc) {
     return registeredTables.computeIfAbsent(
-        tableDesc.getTableId(), id -> streamGraph.getTable(tableDesc));
+        tableDesc.getTableId(), id -> appDescriptor.getTable(tableDesc));
   }
 
-  private <OutT> void doRegisterInputMessageStream(PValue pvalue, String streamId) {
-    // we want to register it with the Samza graph only once per i/o stream
-    if (registeredInputStreams.containsKey(streamId)) {
-      MessageStream<OpMessage<OutT>> messageStream = registeredInputStreams.get(streamId);
-      LOG.info(
-          String.format(
-              "Stream id %s has already been mapped to %s stream. Mapping %s to the same message stream.",
-              streamId, messageStream, pvalue));
-      registerMessageStream(pvalue, messageStream);
-
-      return;
-    }
-    @SuppressWarnings("unchecked")
-    final MessageStream<OpMessage<OutT>> typedStream =
-        streamGraph
-            .<org.apache.samza.operators.KV<?, OpMessage<OutT>>>getInputStream(streamId)
-            .map(org.apache.samza.operators.KV::getValue);
-
-    registerMessageStream(pvalue, typedStream);
-    registeredInputStreams.put(streamId, typedStream);
+  private static <T> MessageStream<T> getValueStream(
+      MessageStream<org.apache.samza.operators.KV<?, T>> input) {
+    return input.map(org.apache.samza.operators.KV::getValue);
   }
 
-  private String getIdForPValue(PValue pvalue) {
+  public String getIdForPValue(PValue pvalue) {
     final String id = idMap.get(pvalue);
     if (id == null) {
       throw new IllegalArgumentException("No id mapping for value: " + pvalue);
     }
     return id;
   }
+
+  /** The dummy stream created will only be used in Beam tests. */
+  private static InputDescriptor<OpMessage<String>, ?> createDummyStreamDescriptor(String id) {
+    final GenericSystemDescriptor dummySystem =
+        new GenericSystemDescriptor(id, InMemorySystemFactory.class.getName());
+    final GenericInputDescriptor<OpMessage<String>> dummyInput =
+        dummySystem.getInputDescriptor(id, new NoOpSerde<>());
+    dummyInput.withOffsetDefault(SystemStreamMetadata.OffsetType.OLDEST);
+    final Config config = new MapConfig(dummyInput.toConfig(), dummySystem.toConfig());
+    final SystemFactory factory = new InMemorySystemFactory();
+    final StreamSpec dummyStreamSpec = new StreamSpec(id, id, id, 1);
+    factory.getAdmin(id, config).createStream(dummyStreamSpec);
+
+    final SystemProducer producer = factory.getProducer(id, config, null);
+    final SystemStream sysStream = new SystemStream(id, id);
+    final Consumer<Object> sendFn =
+        (msg) -> {
+          producer.send(id, new OutgoingMessageEnvelope(sysStream, 0, null, msg));
+        };
+    final WindowedValue<String> windowedValue =
+        WindowedValue.timestampedValueInGlobalWindow("dummy", new Instant());
+
+    sendFn.accept(OpMessage.ofElement(windowedValue));
+    sendFn.accept(new WatermarkMessage(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
+    sendFn.accept(new EndOfStreamMessage(null));
+    return dummyInput;
+  }
 }
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystemTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystemTest.java
index bb8066c..964f0fe 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystemTest.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/BoundedSourceSystemTest.java
@@ -55,6 +55,7 @@ public class BoundedSourceSystemTest {
 
   // A reasonable time to wait to get all messages from the bounded source assuming no blocking.
   private static final long DEFAULT_TIMEOUT_MILLIS = 1000;
+  private static final String NULL_STRING = null;
 
   @Test
   public void testConsumerStartStop() throws IOException, InterruptedException {
@@ -215,9 +216,9 @@ public class BoundedSourceSystemTest {
 
     final BoundedSourceSystem.Consumer<String> consumer = createConsumer(source, 3);
 
-    consumer.register(ssp(0), null);
-    consumer.register(ssp(1), null);
-    consumer.register(ssp(2), null);
+    consumer.register(ssp(0), NULL_STRING);
+    consumer.register(ssp(1), NULL_STRING);
+    consumer.register(ssp(2), NULL_STRING);
     consumer.start();
 
     final Set<String> offsets = new HashSet<>();
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystemTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystemTest.java
index 9b7b217..53e7ed7 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystemTest.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/adapter/UnboundedSourceSystemTest.java
@@ -56,6 +56,7 @@ public class UnboundedSourceSystemTest {
   // A reasonable time to wait to get all messages from the source assuming no blocking.
   private static final long DEFAULT_TIMEOUT_MILLIS = 1000;
   private static final long DEFAULT_WATERMARK_TIMEOUT_MILLIS = 1000;
+  private static final String NULL_STRING = null;
 
   private static final SystemStreamPartition DEFAULT_SSP =
       new SystemStreamPartition("default-system", "default-system", new Partition(0));
@@ -86,7 +87,7 @@ public class UnboundedSourceSystemTest {
     final UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer =
         createConsumer(source);
 
-    consumer.register(DEFAULT_SSP, null);
+    consumer.register(DEFAULT_SSP, NULL_STRING);
     consumer.start();
     assertEquals(
         Arrays.asList(
@@ -110,7 +111,7 @@ public class UnboundedSourceSystemTest {
     final UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer =
         createConsumer(source);
 
-    consumer.register(DEFAULT_SSP, null);
+    consumer.register(DEFAULT_SSP, NULL_STRING);
     consumer.start();
     assertEquals(
         Arrays.asList(
@@ -133,7 +134,7 @@ public class UnboundedSourceSystemTest {
     final UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer =
         createConsumer(source);
 
-    consumer.register(DEFAULT_SSP, null);
+    consumer.register(DEFAULT_SSP, NULL_STRING);
     consumer.start();
     assertEquals(
         Arrays.asList(
@@ -161,7 +162,7 @@ public class UnboundedSourceSystemTest {
     final UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer =
         createConsumer(source);
 
-    consumer.register(DEFAULT_SSP, null);
+    consumer.register(DEFAULT_SSP, NULL_STRING);
     consumer.start();
     assertEquals(
         Arrays.asList(
@@ -193,7 +194,7 @@ public class UnboundedSourceSystemTest {
     final UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer =
         createConsumer(source);
 
-    consumer.register(DEFAULT_SSP, null);
+    consumer.register(DEFAULT_SSP, NULL_STRING);
     consumer.start();
     // consume to the first watermark
     assertEquals(
@@ -223,7 +224,7 @@ public class UnboundedSourceSystemTest {
     final UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer =
         createConsumer(source);
 
-    consumer.register(DEFAULT_SSP, null);
+    consumer.register(DEFAULT_SSP, NULL_STRING);
     consumer.start();
     expectWrappedException(
         exception,
@@ -271,7 +272,7 @@ public class UnboundedSourceSystemTest {
     final UnboundedSourceSystem.Consumer<String, TestCheckpointMark> consumer =
         createConsumer(source);
 
-    consumer.register(DEFAULT_SSP, null);
+    consumer.register(DEFAULT_SSP, NULL_STRING);
     consumer.start();
     assertEquals(
         Collections.singletonList(createElementMessage(DEFAULT_SSP, offset(0), "before", now)),
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java
index ca085c2..f75a758 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaStoreStateInternalsTest.java
@@ -56,8 +56,10 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
-import org.apache.samza.container.SamzaContainerContext;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
 import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.storage.StorageEngineFactory;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueIterator;
 import org.apache.samza.storage.kv.KeyValueStore;
@@ -194,7 +196,9 @@ public class SamzaStoreStateInternalsTest implements Serializable {
         File storeDir,
         MetricsRegistry registry,
         SystemStreamPartition changeLogSystemStreamPartition,
-        SamzaContainerContext containerContext) {
+        JobContext jobContext,
+        ContainerContext containerContext,
+        StorageEngineFactory.StoreMode readWrite) {
       KeyValueStoreMetrics metrics = new KeyValueStoreMetrics(storeName, registry);
       return new TestStore(metrics);
     }
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
index 6e8b9be..25ef315 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
@@ -39,11 +39,12 @@ import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.context.TaskContext;
 import org.apache.samza.metrics.MetricsRegistryMap;
-import org.apache.samza.operators.TimerRegistry;
+import org.apache.samza.operators.Scheduler;
+import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.storage.kv.KeyValueStoreMetrics;
 import org.apache.samza.storage.kv.RocksDbKeyValueStore;
-import org.apache.samza.task.TaskContext;
 import org.joda.time.Instant;
 import org.junit.Test;
 import org.rocksdb.FlushOptions;
@@ -73,7 +74,7 @@ public class SamzaTimerInternalsFactoryTest {
   private static SamzaStoreStateInternals.Factory<?> createNonKeyedStateInternalsFactory(
       SamzaPipelineOptions pipelineOptions, RocksDbKeyValueStore store) {
     final TaskContext context = mock(TaskContext.class);
-    when(context.getStore(anyString())).thenReturn(store);
+    when(context.getStore(anyString())).thenReturn((KeyValueStore) store);
     final TupleTag<?> mainOutputTag = new TupleTag<>("output");
 
     return SamzaStoreStateInternals.createStateInternalFactory(
@@ -81,7 +82,7 @@ public class SamzaTimerInternalsFactoryTest {
   }
 
   private static SamzaTimerInternalsFactory<String> createTimerInternalsFactory(
-      TimerRegistry<KeyedTimerData<String>> timerRegistry,
+      Scheduler<KeyedTimerData<String>> timerRegistry,
       String timerStateId,
       SamzaPipelineOptions pipelineOptions,
       RocksDbKeyValueStore store) {
@@ -98,11 +99,11 @@ public class SamzaTimerInternalsFactoryTest {
         pipelineOptions);
   }
 
-  private static class TestTimerRegistry implements TimerRegistry<KeyedTimerData<String>> {
+  private static class TestTimerRegistry implements Scheduler<KeyedTimerData<String>> {
     private final List<KeyedTimerData<String>> timers = new ArrayList<>();
 
     @Override
-    public void register(KeyedTimerData<String> key, long timestamp) {
+    public void schedule(KeyedTimerData<String> key, long timestamp) {
       timers.add(key);
     }